Add packet reception and header extraction instructions. The RX must
be the first pipeline instruction. Each extracted header is logically
removed from the packet, then it can be read/written by instructions,
emitted into the outgoing packet or discarded.

Signed-off-by: Cristian Dumitrescu <cristian.dumitre...@intel.com>
---
 lib/librte_pipeline/rte_pipeline_version.map |   1 +
 lib/librte_pipeline/rte_swx_pipeline.c       | 564 ++++++++++++++++++-
 lib/librte_pipeline/rte_swx_pipeline.h       |  13 +
 3 files changed, 574 insertions(+), 4 deletions(-)

diff --git a/lib/librte_pipeline/rte_pipeline_version.map 
b/lib/librte_pipeline/rte_pipeline_version.map
index 7139df0d3..793957eb9 100644
--- a/lib/librte_pipeline/rte_pipeline_version.map
+++ b/lib/librte_pipeline/rte_pipeline_version.map
@@ -73,6 +73,7 @@ EXPERIMENTAL {
        rte_swx_pipeline_instructions_config;
        rte_swx_pipeline_build;
        rte_swx_pipeline_free;
+       rte_swx_pipeline_run;
        rte_swx_pipeline_table_state_get;
        rte_swx_pipeline_table_state_set;
 };
diff --git a/lib/librte_pipeline/rte_swx_pipeline.c 
b/lib/librte_pipeline/rte_swx_pipeline.c
index 2ae6229d0..d7af80e39 100644
--- a/lib/librte_pipeline/rte_swx_pipeline.c
+++ b/lib/librte_pipeline/rte_swx_pipeline.c
@@ -8,6 +8,7 @@
 #include <sys/queue.h>
 
 #include <rte_common.h>
+#include <rte_prefetch.h>
 
 #include "rte_swx_pipeline.h"
 #include "rte_swx_ctl.h"
@@ -21,6 +22,16 @@ do {                                                         
                  \
 #define CHECK_NAME(name, err_code)                                             
\
        CHECK((name) && (name)[0], err_code)
 
+#ifndef TRACE_LEVEL
+#define TRACE_LEVEL 0
+#endif
+
+#if TRACE_LEVEL
+#define TRACE(...) printf(__VA_ARGS__)
+#else
+#define TRACE(...)
+#endif
+
 /*
  * Struct.
  */
@@ -181,7 +192,64 @@ struct header_out_runtime {
 /*
  * Instruction.
  */
+
+/* Packet headers are always in Network Byte Order (NBO), i.e. big endian.
+ * Packet meta-data fields are always assumed to be in Host Byte Order (HBO).
+ * Table entry fields can be in either NBO or HBO; they are assumed to be in 
HBO
+ * when transferred to packet meta-data and in NBO when transferred to packet
+ * headers.
+ */
+
+/* Notation conventions:
+ *    -Header field: H = h.header.field (dst/src)
+ *    -Meta-data field: M = m.field (dst/src)
+ *    -Extern object mailbox field: E = e.field (dst/src)
+ *    -Extern function mailbox field: F = f.field (dst/src)
+ *    -Table action data field: T = t.field (src only)
+ *    -Immediate value: I = 32-bit unsigned value (src only)
+ */
+
+enum instruction_type {
+       /* rx m.port_in */
+       INSTR_RX,
+
+       /* extract h.header */
+       INSTR_HDR_EXTRACT,
+       INSTR_HDR_EXTRACT2,
+       INSTR_HDR_EXTRACT3,
+       INSTR_HDR_EXTRACT4,
+       INSTR_HDR_EXTRACT5,
+       INSTR_HDR_EXTRACT6,
+       INSTR_HDR_EXTRACT7,
+       INSTR_HDR_EXTRACT8,
+};
+
+struct instr_io {
+       struct {
+               uint8_t offset;
+               uint8_t n_bits;
+               uint8_t pad[2];
+       } io;
+
+       struct {
+               uint8_t header_id[8];
+               uint8_t struct_id[8];
+               uint8_t n_bytes[8];
+       } hdr;
+};
+
 struct instruction {
+       enum instruction_type type;
+       union {
+               struct instr_io io;
+       };
+};
+
+struct instruction_data {
+       char label[RTE_SWX_NAME_SIZE];
+       char jmp_label[RTE_SWX_NAME_SIZE];
+       uint32_t n_users; /* user = jmp instruction to this instruction. */
+       int invalid;
 };
 
 /*
@@ -251,6 +319,10 @@ struct table_runtime {
  * Pipeline.
  */
 struct thread {
+       /* Packet. */
+       struct rte_swx_pkt pkt;
+       uint8_t *ptr;
+
        /* Structures. */
        uint8_t **structs;
 
@@ -280,6 +352,29 @@ struct thread {
        struct instruction *ret;
 };
 
+#define MASK64_BIT_GET(mask, pos) ((mask) & (1LLU << (pos)))
+#define MASK64_BIT_SET(mask, pos) ((mask) | (1LLU << (pos)))
+#define MASK64_BIT_CLR(mask, pos) ((mask) & ~(1LLU << (pos)))
+
+#define METADATA_READ(thread, offset, n_bits)                                  
\
+({                                                                             
\
+       uint64_t *m64_ptr = (uint64_t *)&(thread)->metadata[offset];           \
+       uint64_t m64 = *m64_ptr;                                               \
+       uint64_t m64_mask = UINT64_MAX >> (64 - (n_bits));                     \
+       (m64 & m64_mask);                                                      \
+})
+
+#define METADATA_WRITE(thread, offset, n_bits, value)                          
\
+{                                                                              
\
+       uint64_t *m64_ptr = (uint64_t *)&(thread)->metadata[offset];           \
+       uint64_t m64 = *m64_ptr;                                               \
+       uint64_t m64_mask = UINT64_MAX >> (64 - (n_bits));                     \
+                                                                              \
+       uint64_t m_new = value;                                                \
+                                                                              \
+       *m64_ptr = (m64 & ~m64_mask) | (m_new & m64_mask);                     \
+}
+
 #ifndef RTE_SWX_PIPELINE_THREADS_MAX
 #define RTE_SWX_PIPELINE_THREADS_MAX 16
 #endif
@@ -315,6 +410,8 @@ struct rte_swx_pipeline {
        uint32_t n_actions;
        uint32_t n_tables;
        uint32_t n_headers;
+       uint32_t thread_id;
+       uint32_t port_id;
        uint32_t n_instructions;
        int build_done;
        int numa_node;
@@ -1187,6 +1284,16 @@ header_find(struct rte_swx_pipeline *p, const char *name)
        return NULL;
 }
 
+static struct header *
+header_parse(struct rte_swx_pipeline *p,
+            const char *name)
+{
+       if (name[0] != 'h' || name[1] != '.')
+               return NULL;
+
+       return header_find(p, &name[2]);
+}
+
 static struct field *
 header_field_parse(struct rte_swx_pipeline *p,
                   const char *name,
@@ -1430,19 +1537,459 @@ metadata_free(struct rte_swx_pipeline *p)
 /*
  * Instruction.
  */
+static inline void
+pipeline_port_inc(struct rte_swx_pipeline *p)
+{
+       p->port_id = (p->port_id + 1) & (p->n_ports_in - 1);
+}
+
 static inline void
 thread_ip_reset(struct rte_swx_pipeline *p, struct thread *t)
 {
        t->ip = p->instructions;
 }
 
+static inline void
+thread_ip_inc(struct rte_swx_pipeline *p);
+
+static inline void
+thread_ip_inc(struct rte_swx_pipeline *p)
+{
+       struct thread *t = &p->threads[p->thread_id];
+
+       t->ip++;
+}
+
+static inline void
+thread_ip_inc_cond(struct thread *t, int cond)
+{
+       t->ip += cond;
+}
+
+static inline void
+thread_yield(struct rte_swx_pipeline *p)
+{
+       p->thread_id = (p->thread_id + 1) & (RTE_SWX_PIPELINE_THREADS_MAX - 1);
+}
+
+/*
+ * rx.
+ */
+static int
+instr_rx_translate(struct rte_swx_pipeline *p,
+                  struct action *action,
+                  char **tokens,
+                  int n_tokens,
+                  struct instruction *instr,
+                  struct instruction_data *data __rte_unused)
+{
+       struct field *f;
+
+       CHECK(!action, EINVAL);
+       CHECK(n_tokens == 2, EINVAL);
+
+       f = metadata_field_parse(p, tokens[1]);
+       CHECK(f, EINVAL);
+
+       instr->type = INSTR_RX;
+       instr->io.io.offset = f->offset / 8;
+       instr->io.io.n_bits = f->n_bits;
+       return 0;
+}
+
+static inline void
+instr_rx_exec(struct rte_swx_pipeline *p);
+
+static inline void
+instr_rx_exec(struct rte_swx_pipeline *p)
+{
+       struct thread *t = &p->threads[p->thread_id];
+       struct instruction *ip = t->ip;
+       struct port_in_runtime *port = &p->in[p->port_id];
+       struct rte_swx_pkt *pkt = &t->pkt;
+       int pkt_received;
+
+       /* Packet. */
+       pkt_received = port->pkt_rx(port->obj, pkt);
+       t->ptr = &pkt->pkt[pkt->offset];
+       rte_prefetch0(t->ptr);
+
+       TRACE("[Thread %2u] rx %s from port %u\n",
+             p->thread_id,
+             pkt_received ? "1 pkt" : "0 pkts",
+             p->port_id);
+
+       /* Headers. */
+       t->valid_headers = 0;
+       t->n_headers_out = 0;
+
+       /* Meta-data. */
+       METADATA_WRITE(t, ip->io.io.offset, ip->io.io.n_bits, p->port_id);
+
+       /* Tables. */
+       t->table_state = p->table_state;
+
+       /* Thread. */
+       pipeline_port_inc(p);
+       thread_ip_inc_cond(t, pkt_received);
+       thread_yield(p);
+}
+
+/*
+ * extract.
+ */
+static int
+instr_hdr_extract_translate(struct rte_swx_pipeline *p,
+                           struct action *action,
+                           char **tokens,
+                           int n_tokens,
+                           struct instruction *instr,
+                           struct instruction_data *data __rte_unused)
+{
+       struct header *h;
+
+       CHECK(!action, EINVAL);
+       CHECK(n_tokens == 2, EINVAL);
+
+       h = header_parse(p, tokens[1]);
+       CHECK(h, EINVAL);
+
+       instr->type = INSTR_HDR_EXTRACT;
+       instr->io.hdr.header_id[0] = h->id;
+       instr->io.hdr.struct_id[0] = h->struct_id;
+       instr->io.hdr.n_bytes[0] = h->st->n_bits / 8;
+       return 0;
+}
+
+static inline void
+__instr_hdr_extract_exec(struct rte_swx_pipeline *p, uint32_t n_extract);
+
+static inline void
+__instr_hdr_extract_exec(struct rte_swx_pipeline *p, uint32_t n_extract)
+{
+       struct thread *t = &p->threads[p->thread_id];
+       struct instruction *ip = t->ip;
+       uint64_t valid_headers = t->valid_headers;
+       uint8_t *ptr = t->ptr;
+       uint32_t offset = t->pkt.offset;
+       uint32_t length = t->pkt.length;
+       uint32_t i;
+
+       for (i = 0; i < n_extract; i++) {
+               uint32_t header_id = ip->io.hdr.header_id[i];
+               uint32_t struct_id = ip->io.hdr.struct_id[i];
+               uint32_t n_bytes = ip->io.hdr.n_bytes[i];
+
+               TRACE("[Thread %2u]: extract header %u (%u bytes)\n",
+                     p->thread_id,
+                     header_id,
+                     n_bytes);
+
+               /* Headers. */
+               t->structs[struct_id] = ptr;
+               valid_headers = MASK64_BIT_SET(valid_headers, header_id);
+
+               /* Packet. */
+               offset += n_bytes;
+               length -= n_bytes;
+               ptr += n_bytes;
+       }
+
+       /* Headers. */
+       t->valid_headers = valid_headers;
+
+       /* Packet. */
+       t->pkt.offset = offset;
+       t->pkt.length = length;
+       t->ptr = ptr;
+}
+
+static inline void
+instr_hdr_extract_exec(struct rte_swx_pipeline *p)
+{
+       __instr_hdr_extract_exec(p, 1);
+
+       /* Thread. */
+       thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract2_exec(struct rte_swx_pipeline *p)
+{
+       TRACE("[Thread %2u] *** The next 2 instructions are fused. ***\n",
+             p->thread_id);
+
+       __instr_hdr_extract_exec(p, 2);
+
+       /* Thread. */
+       thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract3_exec(struct rte_swx_pipeline *p)
+{
+       TRACE("[Thread %2u] *** The next 3 instructions are fused. ***\n",
+             p->thread_id);
+
+       __instr_hdr_extract_exec(p, 3);
+
+       /* Thread. */
+       thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract4_exec(struct rte_swx_pipeline *p)
+{
+       TRACE("[Thread %2u] *** The next 4 instructions are fused. ***\n",
+             p->thread_id);
+
+       __instr_hdr_extract_exec(p, 4);
+
+       /* Thread. */
+       thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract5_exec(struct rte_swx_pipeline *p)
+{
+       TRACE("[Thread %2u] *** The next 5 instructions are fused. ***\n",
+             p->thread_id);
+
+       __instr_hdr_extract_exec(p, 5);
+
+       /* Thread. */
+       thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract6_exec(struct rte_swx_pipeline *p)
+{
+       TRACE("[Thread %2u] *** The next 6 instructions are fused. ***\n",
+             p->thread_id);
+
+       __instr_hdr_extract_exec(p, 6);
+
+       /* Thread. */
+       thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract7_exec(struct rte_swx_pipeline *p)
+{
+       TRACE("[Thread %2u] *** The next 7 instructions are fused. ***\n",
+             p->thread_id);
+
+       __instr_hdr_extract_exec(p, 7);
+
+       /* Thread. */
+       thread_ip_inc(p);
+}
+
+static inline void
+instr_hdr_extract8_exec(struct rte_swx_pipeline *p)
+{
+       TRACE("[Thread %2u] *** The next 8 instructions are fused. ***\n",
+             p->thread_id);
+
+       __instr_hdr_extract_exec(p, 8);
+
+       /* Thread. */
+       thread_ip_inc(p);
+}
+
+#define RTE_SWX_INSTRUCTION_TOKENS_MAX 16
+
+static int
+instr_translate(struct rte_swx_pipeline *p,
+               struct action *action,
+               char *string,
+               struct instruction *instr,
+               struct instruction_data *data)
+{
+       char *tokens[RTE_SWX_INSTRUCTION_TOKENS_MAX];
+       int n_tokens = 0, tpos = 0;
+
+       /* Parse the instruction string into tokens. */
+       for ( ; ; ) {
+               char *token;
+
+               token = strtok_r(string, " \t\v", &string);
+               if (!token)
+                       break;
+
+               CHECK(n_tokens < RTE_SWX_INSTRUCTION_TOKENS_MAX, EINVAL);
+
+               tokens[n_tokens] = token;
+               n_tokens++;
+       }
+
+       CHECK(n_tokens, EINVAL);
+
+       /* Handle the optional instruction label. */
+       if ((n_tokens >= 2) && !strcmp(tokens[1], ":")) {
+               strcpy(data->label, tokens[0]);
+
+               tpos += 2;
+               CHECK(n_tokens - tpos, EINVAL);
+       }
+
+       /* Identify the instruction type. */
+       if (!strcmp(tokens[tpos], "rx"))
+               return instr_rx_translate(p,
+                                         action,
+                                         &tokens[tpos],
+                                         n_tokens - tpos,
+                                         instr,
+                                         data);
+
+       if (!strcmp(tokens[tpos], "extract"))
+               return instr_hdr_extract_translate(p,
+                                                  action,
+                                                  &tokens[tpos],
+                                                  n_tokens - tpos,
+                                                  instr,
+                                                  data);
+
+       CHECK(0, EINVAL);
+}
+
+static uint32_t
+label_is_used(struct instruction_data *data, uint32_t n, const char *label)
+{
+       uint32_t count = 0, i;
+
+       if (!label[0])
+               return 0;
+
+       for (i = 0; i < n; i++)
+               if (!strcmp(label, data[i].jmp_label))
+                       count++;
+
+       return count;
+}
+
 static int
-instruction_config(struct rte_swx_pipeline *p __rte_unused,
-                  struct action *a __rte_unused,
-                  const char **instructions __rte_unused,
-                  uint32_t n_instructions __rte_unused)
+instr_label_check(struct instruction_data *instruction_data,
+                 uint32_t n_instructions)
 {
+       uint32_t i;
+
+       /* Check that all instruction labels are unique. */
+       for (i = 0; i < n_instructions; i++) {
+               struct instruction_data *data = &instruction_data[i];
+               char *label = data->label;
+               uint32_t j;
+
+               if (!label[0])
+                       continue;
+
+               for (j = i + 1; j < n_instructions; j++)
+                       CHECK(strcmp(label, data[j].label), EINVAL);
+       }
+
+       /* Get users for each instruction label. */
+       for (i = 0; i < n_instructions; i++) {
+               struct instruction_data *data = &instruction_data[i];
+               char *label = data->label;
+
+               data->n_users = label_is_used(instruction_data,
+                                             n_instructions,
+                                             label);
+       }
+
+       return 0;
+}
+
+static int
+instruction_config(struct rte_swx_pipeline *p,
+                  struct action *a,
+                  const char **instructions,
+                  uint32_t n_instructions)
+{
+       struct instruction *instr = NULL;
+       struct instruction_data *data = NULL;
+       char *string = NULL;
+       int err = 0;
+       uint32_t i;
+
+       CHECK(n_instructions, EINVAL);
+       CHECK(instructions, EINVAL);
+       for (i = 0; i < n_instructions; i++)
+               CHECK(instructions[i], EINVAL);
+
+       /* Memory allocation. */
+       instr = calloc(n_instructions, sizeof(struct instruction));
+       if (!instr) {
+               err = ENOMEM;
+               goto error;
+       }
+
+       data = calloc(n_instructions, sizeof(struct instruction_data));
+       if (!data) {
+               err = ENOMEM;
+               goto error;
+       }
+
+       for (i = 0; i < n_instructions; i++) {
+               string = strdup(instructions[i]);
+               if (!string) {
+                       err = ENOMEM;
+                       goto error;
+               }
+
+               err = instr_translate(p, a, string, &instr[i], &data[i]);
+               if (err)
+                       goto error;
+
+               free(string);
+       }
+
+       err = instr_label_check(data, n_instructions);
+       if (err)
+               goto error;
+
+       free(data);
+
+       if (a) {
+               a->instructions = instr;
+               a->n_instructions = n_instructions;
+       } else {
+               p->instructions = instr;
+               p->n_instructions = n_instructions;
+       }
+
        return 0;
+
+error:
+       free(string);
+       free(data);
+       free(instr);
+       return err;
+}
+
+typedef void (*instr_exec_t)(struct rte_swx_pipeline *);
+
+static instr_exec_t instruction_table[] = {
+       [INSTR_RX] = instr_rx_exec,
+
+       [INSTR_HDR_EXTRACT] = instr_hdr_extract_exec,
+       [INSTR_HDR_EXTRACT2] = instr_hdr_extract2_exec,
+       [INSTR_HDR_EXTRACT3] = instr_hdr_extract3_exec,
+       [INSTR_HDR_EXTRACT4] = instr_hdr_extract4_exec,
+       [INSTR_HDR_EXTRACT5] = instr_hdr_extract5_exec,
+       [INSTR_HDR_EXTRACT6] = instr_hdr_extract6_exec,
+       [INSTR_HDR_EXTRACT7] = instr_hdr_extract7_exec,
+       [INSTR_HDR_EXTRACT8] = instr_hdr_extract8_exec,
+};
+
+static inline void
+instr_exec(struct rte_swx_pipeline *p)
+{
+       struct thread *t = &p->threads[p->thread_id];
+       struct instruction *ip = t->ip;
+       instr_exec_t instr = instruction_table[ip->type];
+
+       instr(p);
 }
 
 /*
@@ -2226,6 +2773,15 @@ rte_swx_pipeline_build(struct rte_swx_pipeline *p)
        return status;
 }
 
+void
+rte_swx_pipeline_run(struct rte_swx_pipeline *p, uint32_t n_instructions)
+{
+       uint32_t i;
+
+       for (i = 0; i < n_instructions; i++)
+               instr_exec(p);
+}
+
 /*
  * Control.
  */
diff --git a/lib/librte_pipeline/rte_swx_pipeline.h 
b/lib/librte_pipeline/rte_swx_pipeline.h
index 47a0f8dcc..fb83a8820 100644
--- a/lib/librte_pipeline/rte_swx_pipeline.h
+++ b/lib/librte_pipeline/rte_swx_pipeline.h
@@ -534,6 +534,19 @@ __rte_experimental
 int
 rte_swx_pipeline_build(struct rte_swx_pipeline *p);
 
+/**
+ * Pipeline run
+ *
+ * @param[in] p
+ *   Pipeline handle.
+ * @param[in] n_instructions
+ *   Number of instructions to execute.
+ */
+__rte_experimental
+void
+rte_swx_pipeline_run(struct rte_swx_pipeline *p,
+                    uint32_t n_instructions);
+
 /**
  * Pipeline free
  *
-- 
2.17.1

Reply via email to