Avoid usage of common memory accounting functions, since
the logic is pretty much different.

To account for forward allocation, a couple of new atomic_t
members are added to udp_sock: 'mem_alloced' and 'can_reclaim'.
The current forward allocation is estimated as 'mem_alloced'
 minus 'sk_rmem_alloc'.

When the forward allocation can't cope with the packet to be
enqueued, 'mem_alloced' is incremented by the packet size
rounded-up to the next SK_MEM_QUANTUM.
After a dequeue, if under memory pressure, we try to partially
reclaim all forward allocated memory rounded down to an
SK_MEM_QUANTUM and 'mem_alloc' is decreased by that amount.
To protect against concurrent reclaim, we use 'can_reclaim' as
an unblocking synchronization point and let only one process
do the work.
sk->sk_forward_alloc is set after each memory update to the
currently estimated forward allocation, without any lock or
protection.
This value is updated/maintained only to expose some
semi-reasonable value to the eventual reader, and is guaranteed
to be 0 at socket destruction time.

The above needs custom memory reclaiming on shutdown, provided
by the udp_destruct_sock() helper, which completely reclaim
the allocated forward memory.

v1 -> v2:
  - use a udp specific destrctor to perform memory reclaiming
  - remove a couple of helpers, unneeded after the above cleanup
  - do not reclaim memory on dequeue if not under memory
    pressure
  - reworked the fwd accounting schema to avoid potential
    integer overflow

Signed-off-by: Paolo Abeni <pab...@redhat.com>
---
 include/linux/udp.h |   2 +
 include/net/udp.h   |   7 +++
 net/ipv4/udp.c      | 137 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 146 insertions(+)

diff --git a/include/linux/udp.h b/include/linux/udp.h
index d1fd8cd..16aa22b 100644
--- a/include/linux/udp.h
+++ b/include/linux/udp.h
@@ -42,6 +42,8 @@ static inline u32 udp_hashfn(const struct net *net, u32 num, 
u32 mask)
 struct udp_sock {
        /* inet_sock has to be the first member */
        struct inet_sock inet;
+       atomic_t         mem_allocated;
+       atomic_t         can_reclaim;
 #define udp_port_hash          inet.sk.__sk_common.skc_u16hashes[0]
 #define udp_portaddr_hash      inet.sk.__sk_common.skc_u16hashes[1]
 #define udp_portaddr_node      inet.sk.__sk_common.skc_portaddr_node
diff --git a/include/net/udp.h b/include/net/udp.h
index ea53a87..b9563ef 100644
--- a/include/net/udp.h
+++ b/include/net/udp.h
@@ -98,6 +98,8 @@ static inline struct udp_hslot *udp_hashslot2(struct 
udp_table *table,
 extern struct proto udp_prot;
 
 extern atomic_long_t udp_memory_allocated;
+extern int udp_memory_pressure;
+extern struct percpu_counter udp_sockets_allocated;
 
 /* sysctl variables for udp */
 extern long sysctl_udp_mem[3];
@@ -246,6 +248,10 @@ static inline __be16 udp_flow_src_port(struct net *net, 
struct sk_buff *skb,
 }
 
 /* net/ipv4/udp.c */
+void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len);
+int udp_rmem_schedule(struct sock *sk, struct sk_buff *skb);
+void udp_enter_memory_pressure(struct sock *sk);
+
 void udp_v4_early_demux(struct sk_buff *skb);
 int udp_get_port(struct sock *sk, unsigned short snum,
                 int (*saddr_cmp)(const struct sock *,
@@ -258,6 +264,7 @@ void udp_flush_pending_frames(struct sock *sk);
 void udp4_hwcsum(struct sk_buff *skb, __be32 src, __be32 dst);
 int udp_rcv(struct sk_buff *skb);
 int udp_ioctl(struct sock *sk, int cmd, unsigned long arg);
+int udp_init_sock(struct sock *sk);
 int udp_disconnect(struct sock *sk, int flags);
 unsigned int udp_poll(struct file *file, struct socket *sock, poll_table 
*wait);
 struct sk_buff *skb_udp_tunnel_segment(struct sk_buff *skb,
diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c
index 7d96dc2..2218901 100644
--- a/net/ipv4/udp.c
+++ b/net/ipv4/udp.c
@@ -131,6 +131,12 @@ EXPORT_SYMBOL(sysctl_udp_wmem_min);
 atomic_long_t udp_memory_allocated;
 EXPORT_SYMBOL(udp_memory_allocated);
 
+int udp_memory_pressure __read_mostly;
+EXPORT_SYMBOL(udp_memory_pressure);
+
+struct percpu_counter udp_sockets_allocated;
+EXPORT_SYMBOL(udp_sockets_allocated);
+
 #define MAX_UDP_PORTS 65536
 #define PORTS_PER_CHAIN (MAX_UDP_PORTS / UDP_HTABLE_SIZE_MIN)
 
@@ -1172,6 +1178,137 @@ out:
        return ret;
 }
 
+static int __udp_forward(struct udp_sock *up, int rmem)
+{
+       return atomic_read(&up->mem_allocated) - rmem;
+}
+
+static bool udp_under_memory_pressure(const struct sock *sk)
+{
+       if (mem_cgroup_sockets_enabled && sk->sk_memcg &&
+           mem_cgroup_under_socket_pressure(sk->sk_memcg))
+               return true;
+
+       return READ_ONCE(udp_memory_pressure);
+}
+
+void udp_enter_memory_pressure(struct sock *sk)
+{
+       WRITE_ONCE(udp_memory_pressure, 1);
+}
+EXPORT_SYMBOL(udp_enter_memory_pressure);
+
+/* if partial != 0 do nothing if not under memory pressure and avoid
+ * reclaiming last quanta
+ */
+static void udp_rmem_release(struct sock *sk, int partial)
+{
+       struct udp_sock *up = udp_sk(sk);
+       int fwd, amt;
+
+       if (partial && !udp_under_memory_pressure(sk))
+               return;
+
+       /* we can have concurrent release; if we catch any conflict
+        * we let only one of them do the work
+        */
+       if (atomic_dec_if_positive(&up->can_reclaim) < 0)
+               return;
+
+       fwd = __udp_forward(up, atomic_read(&sk->sk_rmem_alloc));
+       if (fwd < SK_MEM_QUANTUM + partial) {
+               atomic_inc(&up->can_reclaim);
+               return;
+       }
+
+       amt = (fwd - partial) & ~(SK_MEM_QUANTUM - 1);
+       atomic_sub(amt, &up->mem_allocated);
+       atomic_inc(&up->can_reclaim);
+
+       __sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
+       sk->sk_forward_alloc = fwd - amt;
+}
+
+static void udp_rmem_free(struct sk_buff *skb)
+{
+       struct sock *sk = skb->sk;
+
+       atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
+       udp_rmem_release(sk, 1);
+}
+
+int udp_rmem_schedule(struct sock *sk, struct sk_buff *skb)
+{
+       int fwd, amt, delta, rmem, err = -ENOMEM;
+       struct udp_sock *up = udp_sk(sk);
+
+       rmem = atomic_add_return(skb->truesize, &sk->sk_rmem_alloc);
+       if (rmem > sk->sk_rcvbuf)
+               goto drop;
+
+       fwd = __udp_forward(up, rmem);
+       if (fwd > 0)
+               goto no_alloc;
+
+       amt = sk_mem_pages(skb->truesize);
+       delta = amt << SK_MEM_QUANTUM_SHIFT;
+       if (!__sk_mem_raise_allocated(sk, delta, amt, SK_MEM_RECV)) {
+               err = -ENOBUFS;
+               goto drop;
+       }
+
+       /* if we have some skbs in the error queue, the forward allocation could
+        * be understimated, even below 0; avoid exporting such values
+        */
+       fwd = atomic_add_return(delta, &up->mem_allocated) - rmem;
+       if (fwd < 0)
+               fwd = SK_MEM_QUANTUM;
+
+no_alloc:
+       sk->sk_forward_alloc = fwd;
+       skb_orphan(skb);
+       skb->sk = sk;
+       skb->destructor = udp_rmem_free;
+       return 0;
+
+drop:
+       atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
+       atomic_inc(&sk->sk_drops);
+       return err;
+}
+EXPORT_SYMBOL_GPL(udp_rmem_schedule);
+
+static void udp_destruct_sock(struct sock *sk)
+{
+       /* reclaim completely the forward allocated memory */
+       __skb_queue_purge(&sk->sk_receive_queue);
+       udp_rmem_release(sk, 0);
+       inet_sock_destruct(sk);
+}
+
+int udp_init_sock(struct sock *sk)
+{
+       struct udp_sock *up = udp_sk(sk);
+
+       atomic_set(&up->mem_allocated, 0);
+       atomic_set(&up->can_reclaim, 1);
+       sk->sk_destruct = udp_destruct_sock;
+       return 0;
+}
+EXPORT_SYMBOL_GPL(udp_init_sock);
+
+void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len)
+{
+       if (unlikely(READ_ONCE(sk->sk_peek_off) >= 0)) {
+               bool slow = lock_sock_fast(sk);
+
+               sk_peek_offset_bwd(sk, len);
+               unlock_sock_fast(sk, slow);
+       }
+       consume_skb(skb);
+}
+EXPORT_SYMBOL_GPL(skb_consume_udp);
+
 /**
  *     first_packet_length     - return length of first packet in receive queue
  *     @sk: socket
-- 
1.8.3.1

Reply via email to