On 4/5/20 10:55 AM, jer...@marvell.com wrote: > From: Jerin Jacob <jer...@marvell.com> > > Adding implementation for rte_graph_walk() API. This will perform a walk > on the circular buffer and call the process function of each node > and collect the stats if stats collection is enabled. > > Signed-off-by: Jerin Jacob <jer...@marvell.com> > Signed-off-by: Kiran Kumar K <kirankum...@marvell.com> > Signed-off-by: Pavan Nikhilesh <pbhagavat...@marvell.com> > Signed-off-by: Nithin Dabilpuram <ndabilpu...@marvell.com> > --- [...] > +__rte_experimental > +static inline void > +rte_graph_walk(struct rte_graph *graph) > +{ > + const rte_graph_off_t *cir_start = graph->cir_start; > + const rte_node_t mask = graph->cir_mask; > + uint32_t head = graph->head; > + struct rte_node *node; > + uint64_t start; > + uint16_t rc; > + void **objs; > + > + /* > + * Walk on the source node(s) ((cir_start - head) -> cir_start) and then > + * on the pending streams (cir_start -> (cir_start + mask) -> cir_start) > + * in a circular buffer fashion. > + * > + * +-----+ <= cir_start - head [number of source nodes] > + * | | > + * | ... | <= source nodes > + * | | > + * +-----+ <= cir_start [head = 0] [tail = 0] > + * | | > + * | ... | <= pending streams > + * | | > + * +-----+ <= cir_start + mask > + */ > + while (likely(head != graph->tail)) { > + node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]); > + RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); > + objs = node->objs; > + rte_prefetch0(objs); > + > + if (rte_graph_has_stats_feature()) { > + start = rte_rdtsc(); > + rc = node->process(graph, node, objs, node->idx); > + node->total_cycles += rte_rdtsc() - start; > + node->total_calls++; > + node->total_objs += rc; > + } else { > + node->process(graph, node, objs, node->idx); > + } > + node->idx = 0;
So I guess this is a responsibility of a node process function to handle all objects. What should it do if it is not possible. E.g. after tx_burst we usually drop packets, how do you drop objects in graph? Do you simply free them (does the node knows how object was allocated?) or you need to pass it to "null" node. Process function returns number of objects processed (e.g. later RX/TX nodes), why it is not used here? > + head = likely((int32_t)head > 0) ? head & mask : head; > + } > + graph->tail = 0; > +} [...] > +__rte_experimental > +static inline void > +rte_node_enqueue(struct rte_graph *graph, struct rte_node *node, > + rte_edge_t next, void **objs, uint16_t nb_objs) > +{ > + node = __rte_node_next_node_get(node, next); > + const uint16_t idx = node->idx; > + > + __rte_node_enqueue_prologue(graph, node, idx, nb_objs); > + > + rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *)); > + node->idx = idx + nb_objs; > +} I see how it works for usual scenario. But is there some kind of fork/join operation? Just trying to imagine scenario where I have a bunch of co-processors doing different operations for given object/packet. Then to to minimize latency I'd like to use them in parallel and when all done then enqueue it to next node. Is there a support for that in terms of graph lib or should that be handled in the process function of a single node? [...] > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Enqueue objs to multiple next nodes for further processing and > + * set the next nodes to pending state in the circular buffer. > + * objs[i] will be enqueued to nexts[i]. > + * > + * @param graph > + * Graph pointer returned from rte_graph_lookup(). > + * @param node > + * Current node pointer. > + * @param nexts > + * List of relative next node indices to enqueue objs. > + * @param objs > + * List of objs to enqueue. > + * @param nb_objs > + * Number of objs to enqueue. > + */ > +__rte_experimental > +static inline void > +rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node, > + rte_edge_t *nexts, void **objs, uint16_t nb_objs) > +{ > + uint16_t i; > + > + for (i = 0; i < nb_objs; i++) > + rte_node_enqueue_x1(graph, node, nexts[i], objs[i]); I have noticed comments about x1/2/4 functions but since you defended the performance there why not having some kind of use of them here (Duff's device like unrolling)? Just like you have in your performance test. [...] > +__rte_experimental > +static inline void ** > +rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node, > + rte_edge_t next, uint16_t nb_objs) > +{ > + node = __rte_node_next_node_get(node, next); > + const uint16_t idx = node->idx; > + uint16_t free_space = node->size - idx; > + > + if (unlikely(free_space < nb_objs)) > + __rte_node_stream_alloc_size(graph, node, nb_objs); > + > + return &node->objs[idx]; > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Put the next stream to pending state in the circular buffer > + * for further processing. Should be invoked followed by > + * rte_node_next_stream_get(). Is the last sentence correct? > + * > + * @param graph > + * Graph pointer returned from rte_graph_lookup(). > + * @param node > + * Current node pointer. > + * @param next > + * Relative next node index.. > + * @param idx > + * Number of objs updated in the stream after getting the stream using > + * rte_node_next_stream_get. > + * > + * @see rte_node_next_stream_get(). > + */ > +__rte_experimental > +static inline void > +rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node, > + rte_edge_t next, uint16_t idx) I understood the stream_move as an optimized enqueue, but could you describe how these _get/put are meant to be used, and why not use stream_move/enqueue? I see in later (testing) patches that it is used for source nodes, and I'm not sure I understand the difference between them. With regards Andrzej Ostruszka