On 01/05/2018 04:25 AM, Jon Maloy wrote:
> In the current implementation, a group socket receiving topology
> events about other members just converts the topology event message
> into a group event message and stores it until it reaches the right
> state to issue it to the user. This complicates the code unnecessarily,
> and becomes impractical when we in the coming commmits will need to

s/commmits/commits

> create and issue membership events indepenedently.

s/indepenedently/independently

> 
> In this commit, we change this so that we just notice the type and
> origin of the incoming topology event, and then drop the buffer. Only
> when it is time to actually send a group event to the user do we
> explicitly create a new message and send it upwards.
> 
> Signed-off-by: Jon Maloy <[email protected]>
> ---
>  net/tipc/group.c | 94 
> +++++++++++++++++++++++++++++++++-----------------------
>  1 file changed, 55 insertions(+), 39 deletions(-)
> 
> diff --git a/net/tipc/group.c b/net/tipc/group.c
> index d1161f9..7720361 100644
> --- a/net/tipc/group.c
> +++ b/net/tipc/group.c
> @@ -64,7 +64,6 @@ struct tipc_member {
>       struct rb_node tree_node;
>       struct list_head list;
>       struct list_head small_win;
> -     struct sk_buff *event_msg;
>       struct sk_buff_head deferredq;
>       struct tipc_group *group;
>       u32 node;
> @@ -631,6 +630,40 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, 
> int blks, u32 node,
>       }
>  }
>  
> +static void tipc_group_create_event(struct tipc_group *grp,
> +                                 struct tipc_member *m,
> +                                 u32 event, u16 seqno,
> +                                 struct sk_buff_head *inputq)
> +{    u32 dnode = tipc_own_addr(grp->net);
> +     struct tipc_event evt;
> +     struct sk_buff *skb;
> +     struct tipc_msg *hdr;
> +
> +     evt.event = event;
> +     evt.found_lower = m->instance;
> +     evt.found_upper = m->instance;
> +     evt.port.ref = m->port;
> +     evt.port.node = m->node;
> +     evt.s.seq.type = grp->type;
> +     evt.s.seq.lower = m->instance;
> +     evt.s.seq.upper = m->instance;
> +
> +     skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT,
> +                           GROUP_H_SIZE, sizeof(evt), dnode, m->node,
> +                           grp->portid, m->port, 0);
> +     if (!skb)
> +             return;
> +
> +     hdr = buf_msg(skb);
> +     msg_set_nametype(hdr, grp->type);
> +     msg_set_grp_evt(hdr, event);
> +     msg_set_dest_droppable(hdr, true);
> +     msg_set_grp_bc_seqno(hdr, seqno);
> +     memcpy(msg_data(hdr), &evt, sizeof(evt));
> +     TIPC_SKB_CB(skb)->orig_member = m->instance;
> +     __skb_queue_tail(inputq, skb);
> +}
> +
>  static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member 
> *m,
>                                 int mtyp, struct sk_buff_head *xmitq)
>  {
> @@ -676,7 +709,6 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool 
> *usr_wakeup,
>       u32 node = msg_orignode(hdr);
>       u32 port = msg_origport(hdr);
>       struct tipc_member *m, *pm;
> -     struct tipc_msg *ehdr;
>       u16 remitted, in_flight;
>  
>       if (!grp)
> @@ -703,9 +735,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool 
> *usr_wakeup,
>                       *usr_wakeup = true;
>                       m->usr_pending = false;
>                       tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
> -                     ehdr = buf_msg(m->event_msg);
> -                     msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
> -                     __skb_queue_tail(inputq, m->event_msg);
> +                     tipc_group_create_event(grp, m, TIPC_PUBLISHED,
> +                                             m->bc_syncpt, inputq);
>               }
>               list_del_init(&m->small_win);
>               tipc_group_update_member(m, 0);
> @@ -724,10 +755,9 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool 
> *usr_wakeup,
>                       m->state = MBR_LEAVING;
>                       return;
>               }
> -             /* Otherwise deliver already received WITHDRAW event */
> -             ehdr = buf_msg(m->event_msg);
> -             msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
> -             __skb_queue_tail(inputq, m->event_msg);
> +             /* Otherwise deliver member WITHDRAW event */
> +             tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
> +                                     m->bc_syncpt, inputq);
>               return;
>       case GRP_ADV_MSG:
>               if (!m)
> @@ -811,22 +841,15 @@ void tipc_group_member_evt(struct tipc_group *grp,
>       bool node_up;
>       u32 self;
>  
> +     kfree_skb(skb);

I suggest we can directly remove the "skb" parameter from
tipc_group_member_evt().

> +
>       if (!grp)
> -             goto drop;
> +             return;
>  
>       net = grp->net;
>       self = tipc_own_addr(net);
>       if (!grp->loopback && node == self && port == grp->portid)
> -             goto drop;
> -
> -     /* Convert message before delivery to user */
> -     msg_set_hdr_sz(hdr, GROUP_H_SIZE);
> -     msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
> -     msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
> -     msg_set_origport(hdr, port);
> -     msg_set_orignode(hdr, node);
> -     msg_set_nametype(hdr, grp->type);
> -     msg_set_grp_evt(hdr, event);
> +             return;
>  
>       m = tipc_group_find_member(grp, node, port);
>  
> @@ -835,59 +858,52 @@ void tipc_group_member_evt(struct tipc_group *grp,
>                       m = tipc_group_create_member(grp, node, port,
>                                                    MBR_DISCOVERED);
>               if (!m)
> -                     goto drop;
> +                     return;
> +
> +             m->instance = instance;
>  
>               /* Hold back event if JOIN message not yet received */
>               if (m->state == MBR_DISCOVERED) {
> -                     m->event_msg = skb;
>                       m->state = MBR_PUBLISHED;
>               } else {
> -                     msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
> -                     __skb_queue_tail(inputq, skb);
> +                     tipc_group_create_event(grp, m, TIPC_PUBLISHED,
> +                                             m->bc_syncpt, inputq);
>                       m->state = MBR_JOINED;
>                       *usr_wakeup = true;
>                       m->usr_pending = false;
>               }
> -             m->instance = instance;
> -             TIPC_SKB_CB(skb)->orig_member = m->instance;
>               tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
>               tipc_group_update_member(m, 0);
>       } else if (event == TIPC_WITHDRAWN) {
>               if (!m)
> -                     goto drop;
> -
> -             TIPC_SKB_CB(skb)->orig_member = m->instance;
> +                     return;
>  
>               *usr_wakeup = true;
>               m->usr_pending = false;
>               node_up = tipc_node_is_up(net, node);
> -             m->event_msg = NULL;
>  
>               if (node_up) {
>                       /* Hold back event if a LEAVE msg should be expected */
>                       if (m->state != MBR_LEAVING) {
> -                             m->event_msg = skb;
>                               tipc_group_decr_active(grp, m);
>                               m->state = MBR_LEAVING;
>                       } else {
> -                             msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
> -                             __skb_queue_tail(inputq, skb);
> +                             tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
> +                                                     m->bc_syncpt, inputq);
>                       }
>               } else {
>                       if (m->state != MBR_LEAVING) {
>                               tipc_group_decr_active(grp, m);
>                               m->state = MBR_LEAVING;
> -                             msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
> +                             tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
> +                                                     m->bc_rcv_nxt, inputq);
>                       } else {
> -                             msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
> +                             tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
> +                                                     m->bc_syncpt, inputq);
>                       }
> -                     __skb_queue_tail(inputq, skb);
>               }
>               list_del_init(&m->list);
>               list_del_init(&m->small_win);
>       }
>       *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
> -     return;
> -drop:
> -     kfree_skb(skb);
>  }
> 

------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to