Nework asynchronous IO. This project was evolved from reeiving zero-copy support [1].
Network AIO is based on kevent and works as usual kevent storage on top of inode. When new socket is created it is associated with that inode and when some activity is detected appropriate notifications are generated and kevent_naio_callback() is called. When new kevent is being registered, network AIO ->enqueue() callback simply marks itself like usual socket event watcher. It also locks physical userspace pages in memory and stores appropriate pointers in private kevent structure. Network AIO callback gets pointers to userspace pages and tries to copy data from receiving skb queue into them using protocol specific callback. This callback is very similar to ->recvmsg(), so they could share a lot in future. Interface is a bit horrible, but it is the simplest one. I've run several benchmarks of asynchronous receiving versus stock recv(). Hardware. Receiving side: Xeon 2.4 Ghz, HT disabled, 1Gb RAM, 1Gbps Intel 8254IPI (PCI-X 133Mhz slot) e1000 adapter. Sending side: AMD64 3500+ 2.2 Ghz, 1Gb RAM, 1Gbps RealTek 8169 adapter integrated into nVidia nForce3 chipset (MSI K8N Neo2). Connection: D-Link DGS-1216T gigabit switch. Receiving software (naio_recv.c) can be found in archive [2]. Sending software is a simple sendfile() based server. Receiving side runs 2.6.15-rc7-event FC3 system. Default settings. Sending side runs 2.6.15-1.1830_FC4 FC4 system. Default settings. Results. Client receives 1Gb of data on each of 8 runs (4 asynchronous receiving and 4 synchronous). Each part of 4 graphs contains speed of both types and CPU usage during test. Performance reported by netperf-2.3 is about 400Mbit/sec. Picture attached. [1]. http://tservice.net.ru/~s0mbre/old/?section=projects&item=recv_zero_copy [2]. http://tservice.net.ru/~s0mbre/archive/kevent/ Signed-off-by: Evgeniy Polyakov <[EMAIL PROTECTED]> diff --git a/arch/i386/kernel/syscall_table.S b/arch/i386/kernel/syscall_table.S index 9b21a31..30113b7 100644 --- a/arch/i386/kernel/syscall_table.S +++ b/arch/i386/kernel/syscall_table.S @@ -294,3 +294,4 @@ ENTRY(sys_call_table) .long sys_inotify_init .long sys_inotify_add_watch .long sys_inotify_rm_watch + .long sys_aio_recv diff --git a/arch/x86_64/ia32/ia32entry.S b/arch/x86_64/ia32/ia32entry.S index e0eb0c7..93c4fd4 100644 --- a/arch/x86_64/ia32/ia32entry.S +++ b/arch/x86_64/ia32/ia32entry.S @@ -643,6 +643,7 @@ ia32_sys_call_table: .quad sys_inotify_init .quad sys_inotify_add_watch .quad sys_inotify_rm_watch + .quad sys_aio_recv ia32_syscall_end: .rept IA32_NR_syscalls-(ia32_syscall_end-ia32_sys_call_table)/8 .quad ni_syscall diff --git a/include/asm-i386/socket.h b/include/asm-i386/socket.h index 802ae76..3473f5c 100644 --- a/include/asm-i386/socket.h +++ b/include/asm-i386/socket.h @@ -49,4 +49,6 @@ #define SO_PEERSEC 31 +#define SO_ASYNC_SOCK 34 + #endif /* _ASM_SOCKET_H */ diff --git a/include/asm-i386/unistd.h b/include/asm-i386/unistd.h index 0f92e78..0207851 100644 --- a/include/asm-i386/unistd.h +++ b/include/asm-i386/unistd.h @@ -299,8 +299,9 @@ #define __NR_inotify_init 291 #define __NR_inotify_add_watch 292 #define __NR_inotify_rm_watch 293 +#define __NR_aio_recv 294 -#define NR_syscalls 294 +#define NR_syscalls 295 /* * user-visible error numbers are in the range -1 - -128: see diff --git a/include/asm-x86_64/ia32_unistd.h b/include/asm-x86_64/ia32_unistd.h index d5166ec..9fba3a0 100644 --- a/include/asm-x86_64/ia32_unistd.h +++ b/include/asm-x86_64/ia32_unistd.h @@ -299,7 +299,8 @@ #define __NR_ia32_inotify_init 291 #define __NR_ia32_inotify_add_watch 292 #define __NR_ia32_inotify_rm_watch 293 +#define __NR_ia32_aio_recv 294 -#define IA32_NR_syscalls 294 /* must be > than biggest syscall! */ +#define IA32_NR_syscalls 295 /* must be > than biggest syscall! */ #endif /* _ASM_X86_64_IA32_UNISTD_H_ */ diff --git a/include/asm-x86_64/socket.h b/include/asm-x86_64/socket.h index f2cdbea..1f31f86 100644 --- a/include/asm-x86_64/socket.h +++ b/include/asm-x86_64/socket.h @@ -49,4 +49,6 @@ #define SO_PEERSEC 31 +#define SO_ASYNC_SOCK 34 + #endif /* _ASM_SOCKET_H */ diff --git a/include/asm-x86_64/unistd.h b/include/asm-x86_64/unistd.h index 2c42150..0d7ee3b 100644 --- a/include/asm-x86_64/unistd.h +++ b/include/asm-x86_64/unistd.h @@ -571,8 +571,10 @@ __SYSCALL(__NR_inotify_init, sys_inotify __SYSCALL(__NR_inotify_add_watch, sys_inotify_add_watch) #define __NR_inotify_rm_watch 255 __SYSCALL(__NR_inotify_rm_watch, sys_inotify_rm_watch) +#define __NR_aio_recv 256 +__SYSCALL(__NR_aio_recv, sys_aio_recv) -#define __NR_syscall_max __NR_inotify_rm_watch +#define __NR_syscall_max __NR_aio_recv #ifndef __NO_STUBS /* user-visible error numbers are in the range -1 - -4095 */ diff --git a/include/linux/kevent.h b/include/linux/kevent.h index 3e164d1..8cf83dc 100644 --- a/include/linux/kevent.h +++ b/include/linux/kevent.h @@ -42,6 +42,7 @@ enum { KEVENT_INODE, KEVENT_TIMER, KEVENT_POLL, + KEVENT_NAIO, KEVENT_MAX, }; @@ -59,7 +60,7 @@ enum { }; /* - * Socket events. + * Socket/network asynchronous IO events. */ enum { KEVENT_SOCKET_RECV = 0x1, @@ -109,7 +110,10 @@ struct ukevent __u32 req_flags; /* Per-event request flags */ __u32 ret_flags; /* Per-event return flags */ __u32 ret_data[2]; /* Event return data. Event originator fills it with anything it likes. */ - __u32 user[2]; /* User's data. It is not used, just copied to/from user. */ + union { + __u32 user[2]; /* User's data. It is not used, just copied to/from user. */ + void *ptr; + }; }; enum { @@ -159,7 +163,7 @@ struct kevent kevent_callback_t callback; /* Is called each time new event has been caught. */ kevent_callback_t enqueue; /* Is called each time new event is queued. */ - kevent_callback_t dequeue; /* Is called each time new event is dequeued. */ + kevent_callback_t dequeue; /* Is called each time event is dequeued. */ void *priv; /* Private data for different storages. * poll()/select storage has a list of wait_queue_t containers @@ -212,10 +216,12 @@ void kevent_storage_dequeue(struct keven int kevent_break(struct kevent *k); int kevent_init(struct kevent *k); + int kevent_init_socket(struct kevent *k); int kevent_init_inode(struct kevent *k); int kevent_init_timer(struct kevent *k); int kevent_init_poll(struct kevent *k); +int kevent_init_naio(struct kevent *k); void kevent_storage_ready(struct kevent_storage *st, kevent_callback_t ready_callback, u32 event); int kevent_storage_init(__u32 type, __u32 event, void *origin, struct kevent_storage *st); @@ -238,6 +244,8 @@ static inline void kevent_inode_remove(s #endif /* CONFIG_KEVENT_INODE */ #ifdef CONFIG_KEVENT_SOCKET void kevent_socket_notify(struct sock *sock, u32 event); +int kevent_socket_dequeue(struct kevent *k); +int kevent_socket_enqueue(struct kevent *k); #else static inline void kevent_socket_notify(struct sock *sock, u32 event) { diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h index 8c5d600..8dbd67d 100644 --- a/include/linux/skbuff.h +++ b/include/linux/skbuff.h @@ -1232,6 +1232,8 @@ extern struct sk_buff *skb_recv_datagram int noblock, int *err); extern unsigned int datagram_poll(struct file *file, struct socket *sock, struct poll_table_struct *wait); +extern int skb_copy_datagram(const struct sk_buff *from, + int offset, void *dst, int size); extern int skb_copy_datagram_iovec(const struct sk_buff *from, int offset, struct iovec *to, int size); diff --git a/include/linux/syscalls.h b/include/linux/syscalls.h index c7007b1..e729fe1 100644 --- a/include/linux/syscalls.h +++ b/include/linux/syscalls.h @@ -512,4 +512,5 @@ asmlinkage long sys_ioprio_get(int which asmlinkage long sys_set_mempolicy(int mode, unsigned long __user *nmask, unsigned long maxnode); +asmlinkage long sys_aio_recv(int fd, void __user *buf, size_t size, unsigned flags); #endif diff --git a/include/net/sock.h b/include/net/sock.h index e7eaaad..c086188 100644 --- a/include/net/sock.h +++ b/include/net/sock.h @@ -213,6 +213,7 @@ struct sock { int sk_route_caps; unsigned long sk_flags; unsigned long sk_lingertime; + unsigned long sk_async_sock; /* * The backlog queue is special, it is always used with * the per-socket spinlock held and requires low latency @@ -466,7 +467,6 @@ static inline void sk_stream_set_owner_r skb->destructor = sk_stream_rfree; atomic_add(skb->truesize, &sk->sk_rmem_alloc); sk->sk_forward_alloc -= skb->truesize; - kevent_socket_notify(sk, KEVENT_SOCKET_RECV); } static inline void sk_stream_free_skb(struct sock *sk, struct sk_buff *skb) @@ -550,6 +550,9 @@ struct proto { int (*backlog_rcv) (struct sock *sk, struct sk_buff *skb); + + int (*async_recv) (struct sock *sk, + void *dst, size_t size); /* Keeping track of sk's, looking them up, and port selection methods. */ void (*hash)(struct sock *sk); diff --git a/include/net/tcp.h b/include/net/tcp.h index 77626a1..e14e48d 100644 --- a/include/net/tcp.h +++ b/include/net/tcp.h @@ -392,6 +392,7 @@ extern int tcp_setsockopt(struct sock int optname, char __user *optval, int optlen); extern void tcp_set_keepalive(struct sock *sk, int val); +extern int tcp_async_recv(struct sock *sk, void *dst, size_t size); extern int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, size_t len, int nonblock, diff --git a/kernel/kevent/Kconfig b/kernel/kevent/Kconfig index 5aae2ef..a52a86f 100644 --- a/kernel/kevent/Kconfig +++ b/kernel/kevent/Kconfig @@ -31,3 +31,9 @@ config KEVENT_POLL depends on KEVENT help This option allows to use kevent subsystem for poll()/select() notifications. + +config KEVENT_NAIO + bool "Network asynchronous IO" + depends on KEVENT_SOCKET + help + This option enables kevent based network asynchronous IO subsystem. diff --git a/kernel/kevent/Makefile b/kernel/kevent/Makefile index 4609205..2bc7135 100644 --- a/kernel/kevent/Makefile +++ b/kernel/kevent/Makefile @@ -3,3 +3,4 @@ obj-$(CONFIG_KEVENT_SOCKET) += kevent_so obj-$(CONFIG_KEVENT_INODE) += kevent_inode.o obj-$(CONFIG_KEVENT_TIMER) += kevent_timer.o obj-$(CONFIG_KEVENT_POLL) += kevent_poll.o +obj-$(CONFIG_KEVENT_NAIO) += kevent_naio.o diff --git a/kernel/kevent/kevent.c b/kernel/kevent/kevent.c index 61a442f..ef08aa7 100644 --- a/kernel/kevent/kevent.c +++ b/kernel/kevent/kevent.c @@ -87,6 +87,9 @@ int kevent_init(struct kevent *k) return -E2BIG; switch (k->event.type) { + case KEVENT_NAIO: + err = kevent_init_naio(k); + break; case KEVENT_SOCKET: err = kevent_init_socket(k); break; @@ -116,9 +119,10 @@ static inline u32 kevent_set_event(struc u32 ev = event & k->event.event; st->event &= ~ev; +#if 0 if (ev) k->event.ret_data[1] = ev; - +#endif return ev; } @@ -165,40 +169,42 @@ void kevent_storage_dequeue(struct keven unsigned long flags; spin_lock_irqsave(&st->lock, flags); - list_del(&k->storage_entry); - st->qlen--; + if (k->storage_entry.next != LIST_POISON1) { + list_del(&k->storage_entry); + st->qlen--; + } spin_unlock_irqrestore(&st->lock, flags); } static void __kevent_requeue(struct kevent *k, u32 event) { - int err, broken; + int err, rem = 0; + + wake_up(&k->user->wait); err = k->callback(k); - if (err < 0) - kevent_break(k); - - spin_lock(&k->lock); - broken = (k->event.ret_flags & KEVENT_RET_BROKEN); - spin_unlock(&k->lock); - if (err || broken) { - spin_lock(&k->lock); + spin_lock(&k->lock); + if (err > 0) { k->event.ret_flags |= KEVENT_RET_DONE; - if (event & k->event.event) - k->event.ret_data[0] = event & k->event.event; - spin_unlock(&k->lock); - - list_del(&k->storage_entry); - list_add_tail(&k->storage_entry, &k->st->list); + } else if (err < 0) { + k->event.ret_flags |= KEVENT_RET_BROKEN; + k->event.ret_flags |= KEVENT_RET_DONE; + } + rem = (k->event.req_flags & KEVENT_REQ_ONESHOT); + spin_unlock(&k->lock); + if (err) { + if (rem) { + list_del(&k->storage_entry); + k->st->qlen--; + } spin_lock(&k->user->ready_lock); if (k->ready_entry.next == LIST_POISON1) { list_add_tail(&k->ready_entry, &k->user->ready_list); k->user->ready_num++; } spin_unlock(&k->user->ready_lock); - wake_up(&k->user->wait); } } @@ -218,11 +224,10 @@ void kevent_storage_ready(struct kevent_ { unsigned long flags; struct kevent *k, *n; - u32 ev; unsigned int qlen; + u32 ev = 0; - spin_lock_irqsave(&st->lock, flags); - + spin_lock_bh(&st->lock); st->event |= event; qlen = st->qlen; @@ -234,19 +239,15 @@ void kevent_storage_ready(struct kevent_ if (ready_callback) ready_callback(k); - spin_lock(&k->lock); - ev = (k->event.event & event); - if (!ev) { - spin_unlock(&k->lock); - continue; - } - kevent_set_event(st, k, event); - spin_unlock(&k->lock); + ev |= (event & k->event.event); - __kevent_requeue(k, event); + if (event & k->event.event) + __kevent_requeue(k, event); } } - spin_unlock_irqrestore(&st->lock, flags); + + st->event &= ~ev; + spin_unlock_bh(&st->lock); } int kevent_storage_init(__u32 type, __u32 event, void *origin, struct kevent_storage *st) diff --git a/kernel/kevent/kevent_init.c b/kernel/kevent/kevent_init.c index 2a75c40..74659df 100644 --- a/kernel/kevent/kevent_init.c +++ b/kernel/kevent/kevent_init.c @@ -33,7 +33,6 @@ int kevent_break(struct kevent *k) spin_lock_irqsave(&k->lock, flags); k->event.ret_flags |= KEVENT_RET_BROKEN; spin_unlock_irqrestore(&k->lock, flags); - printk("%s: k=%p.\n", __func__, k); return 0; } @@ -68,3 +67,11 @@ int kevent_init_poll(struct kevent *k) return -ENODEV; } #endif + +#ifndef CONFIG_KEVENT_NAIO +int kevent_init_naio(struct kevent *k) +{ + kevent_break(k); + return -ENODEV; +} +#endif diff --git a/kernel/kevent/kevent_naio.c b/kernel/kevent/kevent_naio.c new file mode 100644 index 0000000..477cbb1 --- /dev/null +++ b/kernel/kevent/kevent_naio.c @@ -0,0 +1,182 @@ +/* + * kevent_naio.c + * + * 2006 Copyright (c) Evgeniy Polyakov <[EMAIL PROTECTED]> + * All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include <linux/kernel.h> +#include <linux/types.h> +#include <linux/list.h> +#include <linux/spinlock.h> +#include <linux/file.h> +#include <linux/pagemap.h> +#include <linux/kevent.h> + +#include <net/sock.h> +#include <net/tcp_states.h> + +static int kevent_naio_enqueue(struct kevent *k); +static int kevent_naio_dequeue(struct kevent *k); +static int kevent_naio_callback(struct kevent *k); + +asmlinkage long sys_aio_recv(int fd, void __user *buf, size_t size, unsigned flags) +{ + return 0; +} + +static int kevent_naio_enqueue(struct kevent *k) +{ + int err, i; + struct page **page; + void *addr; + unsigned int size = k->event.id.raw[1]; + int num = size/PAGE_SIZE + 1; + struct file *file; + struct sock *sk; + int fput_needed; + + file = fget_light(k->event.id.raw[0], &fput_needed); + if (!file) + return -ENODEV; + + err = -EINVAL; + if (!file->f_dentry || !file->f_dentry->d_inode) + goto err_out_fput; + + sk = SOCKET_I(file->f_dentry->d_inode)->sk; + + err = -ESOCKTNOSUPPORT; + if (!sk || !sk->sk_prot->async_recv) + goto err_out_fput; + + page = kmalloc(sizeof(struct page *) * num, GFP_KERNEL); + if (!page) + return -ENOMEM; + + addr = k->event.ptr; + + err = get_user_pages(current, current->mm, (unsigned long)addr, num, 1, 0, page, NULL); + if (err <= 0) + goto err_out_free; + num = err; + + k->event.ret_data[0] = num; + k->event.ret_data[1] = offset_in_page(k->event.ptr); + k->priv = page; + + err = kevent_socket_enqueue(k); + if (err < 0) + goto err_out_put_pages; + + err = kevent_naio_callback(k); + if (err < 0) + goto err_out_put_pages; + + return err; + +err_out_put_pages: + for (i=0; i<num; ++i) + page_cache_release(page[i]); +err_out_free: + kfree(page); +err_out_fput: + fput_light(file, fput_needed); + + return err; +} + +static int kevent_naio_dequeue(struct kevent *k) +{ + int err, i, num; + struct page **page = k->priv; + + num = k->event.ret_data[0]; + + err = kevent_socket_dequeue(k); + + for (i=0; i<num; ++i) + page_cache_release(page[i]); + + kfree(k->priv); + k->priv = NULL; + + return err; +} + +static int kevent_naio_callback(struct kevent *k) +{ + struct inode *inode = k->st->origin; + struct sock *sk = SOCKET_I(inode)->sk; + unsigned int size = k->event.id.raw[1]; + unsigned int off = k->event.ret_data[1]; + struct page **pages = k->priv, *page; + int ready = 0, num = 0, err = 0; + void *ptr, *optr; + unsigned int len; + + while (size) { + num = off / PAGE_SIZE; + + page = pages[num]; + + optr = ptr = kmap_atomic(page, KM_IRQ0); + if (!ptr) { + err = -ENOMEM; + break; + } + + ptr += off % PAGE_SIZE; + len = min_t(unsigned int, PAGE_SIZE - (ptr - optr), size); + + /* + * sk_prot->async_recv() can return either number of bytes read, + * or negative error value, or zero if socket is closed. + */ + err = sk->sk_prot->async_recv(sk, ptr, len); + + kunmap_atomic(optr, KM_IRQ0); + + if (err > 0) { + num++; + size -= err; + off += err; + } + + if (err != len) + break; + } + + k->event.ret_data[1] = off; + k->event.id.raw[1] = size; + + if (err == 0 || (err < 0 && err != -EAGAIN)) + ready = -1; + + if (!size) + ready = 1; + + return ready; +} + +int kevent_init_naio(struct kevent *k) +{ + k->enqueue = &kevent_naio_enqueue; + k->dequeue = &kevent_naio_dequeue; + k->callback = &kevent_naio_callback; + return 0; +} diff --git a/kernel/kevent/kevent_socket.c b/kernel/kevent/kevent_socket.c index 1606eb6..fd74f3b 100644 --- a/kernel/kevent/kevent_socket.c +++ b/kernel/kevent/kevent_socket.c @@ -59,7 +59,7 @@ static int kevent_socket_callback(struct return 0; } -static int kevent_socket_enqueue(struct kevent *k) +int kevent_socket_enqueue(struct kevent *k) { struct file *file; struct inode *inode; @@ -93,7 +93,7 @@ err_out_fput: return err; } -static int kevent_socket_dequeue(struct kevent *k) +int kevent_socket_dequeue(struct kevent *k) { struct inode *inode = k->st->origin; diff --git a/kernel/kevent/kevent_user.c b/kernel/kevent/kevent_user.c index 314ad18..ae776c5 100644 --- a/kernel/kevent/kevent_user.c +++ b/kernel/kevent/kevent_user.c @@ -252,6 +252,7 @@ static int kevent_user_ctl_modify(struct spin_lock_irqsave(&k->lock, flags); k->event.event = uk.event; k->event.req_flags = uk.req_flags; + k->event.ret_flags = 0; spin_unlock_irqrestore(&k->lock, flags); kevent_requeue(k); } else @@ -343,6 +344,10 @@ static int kevent_user_ctl_add(struct ke break; } + k->event.ret_flags = 0; + k->event.ret_data[0] = 0; + k->event.ret_data[1] = 0; + arg += sizeof(struct ukevent); err = kevent_init(k); @@ -362,6 +367,7 @@ static int kevent_user_ctl_add(struct ke if (err < 0) kevent_free(k); num++; + k->event.ret_flags = 0; } if (err >= 0) { diff --git a/kernel/sys_ni.c b/kernel/sys_ni.c index 1ab2370..fbc2939 100644 --- a/kernel/sys_ni.c +++ b/kernel/sys_ni.c @@ -90,3 +90,5 @@ cond_syscall(sys_pciconfig_iobase); cond_syscall(sys32_ipc); cond_syscall(sys32_sysctl); cond_syscall(ppc_rtas); + +cond_syscall(sys_aio_recv); diff --git a/net/core/datagram.c b/net/core/datagram.c index 1bcfef5..b2f19a7 100644 --- a/net/core/datagram.c +++ b/net/core/datagram.c @@ -200,6 +200,60 @@ void skb_free_datagram(struct sock *sk, } /** + * skb_copy_datagram - Copy a datagram. + * @skb: buffer to copy + * @offset: offset in the buffer to start copying from + * @to: pointer to copy to + * @len: amount of data to copy from buffer to iovec + */ +int skb_copy_datagram(const struct sk_buff *skb, int offset, + void *to, int len) +{ + int i, fraglen, end = 0; + struct sk_buff *next = skb_shinfo(skb)->frag_list; + + if (!len) + return 0; + +next_skb: + fraglen = skb_headlen(skb); + i = -1; + + while (1) { + int start = end; + + if ((end += fraglen) > offset) { + int copy = end - offset, o = offset - start; + + if (copy > len) + copy = len; + if (i == -1) + memcpy(to, skb->data + o, copy); + else { + skb_frag_t *frag = &skb_shinfo(skb)->frags[i]; + struct page *page = frag->page; + void *p = kmap(page) + frag->page_offset + o; + memcpy(to, p, copy); + kunmap(page); + } + if (!(len -= copy)) + return 0; + offset += copy; + } + if (++i >= skb_shinfo(skb)->nr_frags) + break; + fraglen = skb_shinfo(skb)->frags[i].size; + } + if (next) { + skb = next; + BUG_ON(skb_shinfo(skb)->frag_list); + next = skb->next; + goto next_skb; + } + return -EFAULT; +} + +/** * skb_copy_datagram_iovec - Copy a datagram to an iovec. * @skb: buffer to copy * @offset: offset in the buffer to start copying from @@ -467,6 +521,7 @@ unsigned int datagram_poll(struct file * EXPORT_SYMBOL(datagram_poll); EXPORT_SYMBOL(skb_copy_and_csum_datagram_iovec); +EXPORT_SYMBOL(skb_copy_datagram); EXPORT_SYMBOL(skb_copy_datagram_iovec); EXPORT_SYMBOL(skb_free_datagram); EXPORT_SYMBOL(skb_recv_datagram); diff --git a/net/core/sock.c b/net/core/sock.c index cdc3f82..eed500b 100644 --- a/net/core/sock.c +++ b/net/core/sock.c @@ -455,6 +455,11 @@ set_rcvbuf: spin_unlock_bh(&sk->sk_lock.slock); ret = -ENONET; break; + case SO_ASYNC_SOCK: + spin_lock_bh(&sk->sk_lock.slock); + sk->sk_async_sock = valbool; + spin_unlock_bh(&sk->sk_lock.slock); + break; /* We implement the SO_SNDLOWAT etc to not be settable (1003.1g 5.3) */ diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c index ef98b14..e9129c5 100644 --- a/net/ipv4/tcp.c +++ b/net/ipv4/tcp.c @@ -206,6 +206,7 @@ * lingertime == 0 (RFC 793 ABORT Call) * Hirokazu Takahashi : Use copy_from_user() instead of * csum_and_copy_from_user() if possible. + * Evgeniy Polyakov : Network asynchronous IO. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -1090,6 +1091,160 @@ int tcp_read_sock(struct sock *sk, read_ } /* + * Must be called with locked sock. + */ +int tcp_async_recv(struct sock *sk, void *dst, size_t len) +{ + struct tcp_sock *tp = tcp_sk(sk); + int copied = 0; + u32 *seq; + unsigned long used; + int err; + int target; /* Read at least this many bytes */ + + TCP_CHECK_TIMER(sk); + + err = -ENOTCONN; + if (sk->sk_state == TCP_LISTEN) + goto out; + + seq = &tp->copied_seq; + + target = sock_rcvlowat(sk, 0, len); + + do { + struct sk_buff *skb; + u32 offset; + + /* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */ + if (tp->urg_data && tp->urg_seq == *seq) { + if (copied) + break; + } + + /* Next get a buffer. */ + + skb = skb_peek(&sk->sk_receive_queue); + do { + if (!skb) + break; + + /* Now that we have two receive queues this + * shouldn't happen. + */ + if (before(*seq, TCP_SKB_CB(skb)->seq)) { + printk(KERN_INFO "async_recv bug: copied %X " + "seq %X\n", *seq, TCP_SKB_CB(skb)->seq); + break; + } + offset = *seq - TCP_SKB_CB(skb)->seq; + if (skb->h.th->syn) + offset--; + if (offset < skb->len) + goto found_ok_skb; + if (skb->h.th->fin) + goto found_fin_ok; + skb = skb->next; + } while (skb != (struct sk_buff *)&sk->sk_receive_queue); + + if (copied) + break; + + if (sock_flag(sk, SOCK_DONE)) + break; + + if (sk->sk_err) { + copied = sock_error(sk); + break; + } + + if (sk->sk_shutdown & RCV_SHUTDOWN) + break; + + if (sk->sk_state == TCP_CLOSE) { + if (!sock_flag(sk, SOCK_DONE)) { + /* This occurs when user tries to read + * from never connected socket. + */ + copied = -ENOTCONN; + break; + } + break; + } + + copied = -EAGAIN; + break; + + found_ok_skb: + /* Ok so how much can we use? */ + used = skb->len - offset; + if (len < used) + used = len; + + /* Do we have urgent data here? */ + if (tp->urg_data) { + u32 urg_offset = tp->urg_seq - *seq; + if (urg_offset < used) { + if (!urg_offset) { + if (!sock_flag(sk, SOCK_URGINLINE)) { + ++*seq; + offset++; + used--; + if (!used) + goto skip_copy; + } + } else + used = urg_offset; + } + } + + err = skb_copy_datagram(skb, offset, dst, used); + if (err) { + /* Exception. Bailout! */ + if (!copied) + copied = -EFAULT; + break; + } + + *seq += used; + copied += used; + len -= used; + dst += used; + + tcp_rcv_space_adjust(sk); + +skip_copy: + if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) { + tp->urg_data = 0; + tcp_fast_path_check(sk, tp); + } + if (used + offset < skb->len) + continue; + + if (skb->h.th->fin) + goto found_fin_ok; + sk_eat_skb(sk, skb); + continue; + + found_fin_ok: + /* Process the FIN. */ + ++*seq; + sk_eat_skb(sk, skb); + break; + } while (len > 0); + + /* Clean up data we have read: This will do ACK frames. */ + cleanup_rbuf(sk, copied); + + TCP_CHECK_TIMER(sk); + return copied; + +out: + TCP_CHECK_TIMER(sk); + return err; +} + +/* * This routine copies from a sock struct into the user buffer. * * Technical note: in 2.3 we work on _locked_ socket, so that @@ -2136,6 +2291,7 @@ EXPORT_SYMBOL(tcp_getsockopt); EXPORT_SYMBOL(tcp_ioctl); EXPORT_SYMBOL(tcp_poll); EXPORT_SYMBOL(tcp_read_sock); +EXPORT_SYMBOL(tcp_async_recv); EXPORT_SYMBOL(tcp_recvmsg); EXPORT_SYMBOL(tcp_sendmsg); EXPORT_SYMBOL(tcp_sendpage); diff --git a/net/ipv4/tcp_input.c b/net/ipv4/tcp_input.c index 15d3340..8107f6e 100644 --- a/net/ipv4/tcp_input.c +++ b/net/ipv4/tcp_input.c @@ -3820,7 +3820,7 @@ int tcp_rcv_established(struct sock *sk, if (tp->ucopy.task == current && tp->copied_seq == tp->rcv_nxt && len - tcp_header_len <= tp->ucopy.len && - sock_owned_by_user(sk)) { + sock_owned_by_user(sk) && !sk->sk_async_sock) { __set_current_state(TASK_RUNNING); if (!tcp_copy_to_iovec(sk, skb, tcp_header_len)) { diff --git a/net/ipv4/tcp_ipv4.c b/net/ipv4/tcp_ipv4.c index c2a542b..17ad358 100644 --- a/net/ipv4/tcp_ipv4.c +++ b/net/ipv4/tcp_ipv4.c @@ -1248,11 +1248,17 @@ process: bh_lock_sock(sk); ret = 0; - if (!sock_owned_by_user(sk)) { - if (!tcp_prequeue(sk, skb)) - ret = tcp_v4_do_rcv(sk, skb); - } else - sk_add_backlog(sk, skb); + if (sk->sk_async_sock) { + local_bh_disable(); + ret = tcp_v4_do_rcv(sk, skb); + local_bh_enable(); + } else { + if (!sock_owned_by_user(sk)) { + if (!tcp_prequeue(sk, skb)) + ret = tcp_v4_do_rcv(sk, skb); + } else + sk_add_backlog(sk, skb); + } bh_unlock_sock(sk); sock_put(sk); @@ -1977,6 +1983,7 @@ struct proto tcp_prot = { .getsockopt = tcp_getsockopt, .sendmsg = tcp_sendmsg, .recvmsg = tcp_recvmsg, + .async_recv = tcp_async_recv, .backlog_rcv = tcp_v4_do_rcv, .hash = tcp_v4_hash, .unhash = tcp_unhash, -- Evgeniy Polyakov
naio_vs_sync.png
Description: PNG image