This patchset includes socket notifications and network asynchronous IO.

Signed-off-by: Evgeniy Polyakov <[EMAIL PROTECTED]>

diff --git a/include/asm-i386/socket.h b/include/asm-i386/socket.h
index 802ae76..3473f5c 100644
--- a/include/asm-i386/socket.h
+++ b/include/asm-i386/socket.h
@@ -49,4 +49,6 @@ #define SO_ACCEPTCONN         30
 
 #define SO_PEERSEC             31
 
+#define SO_ASYNC_SOCK          34
+
 #endif /* _ASM_SOCKET_H */

diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
index 66f8819..ea914c3 100644
--- a/include/linux/skbuff.h
+++ b/include/linux/skbuff.h
@@ -1269,6 +1269,8 @@ extern struct sk_buff *skb_recv_datagram
                                         int noblock, int *err);
 extern unsigned int    datagram_poll(struct file *file, struct socket *sock,
                                     struct poll_table_struct *wait);
+extern int            skb_copy_datagram(const struct sk_buff *from, 
+                                        int offset, void *dst, int size);
 extern int            skb_copy_datagram_iovec(const struct sk_buff *from,
                                               int offset, struct iovec *to,
                                               int size);
diff --git a/include/net/sock.h b/include/net/sock.h
index d10dfec..7a2bee3 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -47,6 +47,7 @@ #include <linux/module.h>
 #include <linux/netdevice.h>
 #include <linux/skbuff.h>      /* struct sk_buff */
 #include <linux/security.h>
+#include <linux/kevent.h>
 
 #include <linux/filter.h>
 
@@ -386,6 +387,8 @@ enum sock_flags {
        SOCK_NO_LARGESEND, /* whether to sent large segments or not */
        SOCK_LOCALROUTE, /* route locally only, %SO_DONTROUTE setting */
        SOCK_QUEUE_SHRUNK, /* write queue has been shrunk recently */
+       SOCK_ASYNC,
+       SOCK_ASYNC_INUSE,
 };
 
 static inline void sock_copy_flags(struct sock *nsk, struct sock *osk)
@@ -445,6 +448,21 @@ static inline int sk_stream_memory_free(
 
 extern void sk_stream_rfree(struct sk_buff *skb);
 
+struct socket_alloc {
+       struct socket socket;
+       struct inode vfs_inode;
+};
+
+static inline struct socket *SOCKET_I(struct inode *inode)
+{
+       return &container_of(inode, struct socket_alloc, vfs_inode)->socket;
+}
+
+static inline struct inode *SOCK_INODE(struct socket *socket)
+{
+       return &container_of(socket, struct socket_alloc, socket)->vfs_inode;
+}
+
 static inline void sk_stream_set_owner_r(struct sk_buff *skb, struct sock *sk)
 {
        skb->sk = sk;
@@ -472,6 +490,7 @@ static inline void sk_add_backlog(struct
                sk->sk_backlog.tail = skb;
        }
        skb->next = NULL;
+       kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
 }
 
 #define sk_wait_event(__sk, __timeo, __condition)              \
@@ -543,6 +562,12 @@ struct proto {
 
        int                     (*backlog_rcv) (struct sock *sk, 
                                                struct sk_buff *skb);
+       
+       int                     (*async_recv) (struct sock *sk, 
+                                               void *dst, size_t size);
+       int                     (*async_send) (struct sock *sk, 
+                                               struct page **pages, unsigned 
int poffset, 
+                                               size_t size);
 
        /* Keeping track of sk's, looking them up, and port selection methods. 
*/
        void                    (*hash)(struct sock *sk);
@@ -674,21 +699,6 @@ static inline struct kiocb *siocb_to_kio
        return si->kiocb;
 }
 
-struct socket_alloc {
-       struct socket socket;
-       struct inode vfs_inode;
-};
-
-static inline struct socket *SOCKET_I(struct inode *inode)
-{
-       return &container_of(inode, struct socket_alloc, vfs_inode)->socket;
-}
-
-static inline struct inode *SOCK_INODE(struct socket *socket)
-{
-       return &container_of(socket, struct socket_alloc, socket)->vfs_inode;
-}
-
 extern void __sk_stream_mem_reclaim(struct sock *sk);
 extern int sk_stream_mem_schedule(struct sock *sk, int size, int kind);
 
diff --git a/include/net/tcp.h b/include/net/tcp.h
index 5f4eb5c..820cd5a 100644
--- a/include/net/tcp.h
+++ b/include/net/tcp.h
@@ -364,6 +364,8 @@ extern int                  compat_tcp_setsockopt(struc
                                        int level, int optname,
                                        char __user *optval, int optlen);
 extern void                    tcp_set_keepalive(struct sock *sk, int val);
+extern int                     tcp_async_recv(struct sock *sk, void *dst, 
size_t size);
+extern int                     tcp_async_send(struct sock *sk, struct page 
**pages, unsigned int poffset, size_t size);
 extern int                     tcp_recvmsg(struct kiocb *iocb, struct sock *sk,
                                            struct msghdr *msg,
                                            size_t len, int nonblock, 
@@ -857,6 +859,7 @@ static inline int tcp_prequeue(struct so
                        tp->ucopy.memory = 0;
                } else if (skb_queue_len(&tp->ucopy.prequeue) == 1) {
                        wake_up_interruptible(sk->sk_sleep);
+                       kevent_socket_notify(sk, 
KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
                        if (!inet_csk_ack_scheduled(sk))
                                inet_csk_reset_xmit_timer(sk, ICSK_TIME_DACK,
                                                          (3 * TCP_RTO_MIN) / 4,
diff --git a/kernel/kevent/kevent_naio.c b/kernel/kevent/kevent_naio.c
new file mode 100644
index 0000000..1c71021
--- /dev/null
+++ b/kernel/kevent/kevent_naio.c
@@ -0,0 +1,239 @@
+/*
+ *     kevent_naio.c
+ * 
+ * 2006 Copyright (c) Evgeniy Polyakov <[EMAIL PROTECTED]>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/spinlock.h>
+#include <linux/file.h>
+#include <linux/pagemap.h>
+#include <linux/kevent.h>
+
+#include <net/sock.h>
+#include <net/tcp_states.h>
+
+static int kevent_naio_enqueue(struct kevent *k);
+static int kevent_naio_dequeue(struct kevent *k);
+static int kevent_naio_callback(struct kevent *k);
+
+static int kevent_naio_setup_aio(int ctl_fd, int s, void __user *buf, 
+               size_t size, u32 event)
+{
+       struct kevent_user *u;
+       struct file *file;
+       int err, fput_needed;
+       struct ukevent uk;
+
+       file = fget_light(ctl_fd, &fput_needed);
+       if (!file)
+               return -ENODEV;
+
+       u = file->private_data;
+       if (!u) {
+               err = -EINVAL;
+               goto err_out_fput;
+       }
+
+       memset(&uk, 0, sizeof(struct ukevent));
+       uk.type = KEVENT_NAIO;
+       uk.ptr = buf;
+       uk.req_flags = KEVENT_REQ_ONESHOT;
+       uk.event = event;
+       uk.id.raw[0] = s;
+       uk.id.raw[1] = size;
+
+       err = kevent_user_add_ukevent(&uk, u);
+
+err_out_fput:
+       fput_light(file, fput_needed);
+       return err;
+}
+
+asmlinkage long sys_aio_recv(int ctl_fd, int s, void __user *buf, 
+               size_t size, unsigned flags)
+{
+       return kevent_naio_setup_aio(ctl_fd, s, buf, size, KEVENT_SOCKET_RECV);
+}
+
+asmlinkage long sys_aio_send(int ctl_fd, int s, void __user *buf, 
+               size_t size, unsigned flags)
+{
+       return kevent_naio_setup_aio(ctl_fd, s, buf, size, KEVENT_SOCKET_SEND);
+}
+
+static int kevent_naio_enqueue(struct kevent *k)
+{
+       int err, i;
+       struct page **page;
+       void *addr;
+       unsigned int size = k->event.id.raw[1];
+       int num = size/PAGE_SIZE;
+       struct file *file;
+       struct sock *sk = NULL;
+       int fput_needed;
+
+       file = fget_light(k->event.id.raw[0], &fput_needed);
+       if (!file)
+               return -ENODEV;
+
+       err = -EINVAL;
+       if (!file->f_dentry || !file->f_dentry->d_inode)
+               goto err_out_fput;
+
+       sk = SOCKET_I(file->f_dentry->d_inode)->sk;
+
+       err = -ESOCKTNOSUPPORT;
+       if (!sk || !sk->sk_prot->async_recv || !sk->sk_prot->async_send || 
+               !sock_flag(sk, SOCK_ASYNC))
+               goto err_out_fput;
+       
+       addr = k->event.ptr;
+       if (((unsigned long)addr & PAGE_MASK) != (unsigned long)addr)
+               num++;
+
+       page = kmalloc(sizeof(struct page *) * num, GFP_KERNEL);
+       if (!page)
+               return -ENOMEM;
+
+       down_read(&current->mm->mmap_sem);
+       err = get_user_pages(current, current->mm, (unsigned long)addr, 
+                       num, 1, 0, page, NULL);
+       up_read(&current->mm->mmap_sem);
+       if (err <= 0)
+               goto err_out_free;
+       num = err;
+
+       k->event.ret_data[0] = num;
+       k->event.ret_data[1] = offset_in_page(k->event.ptr);
+       k->priv = page;
+
+       sk->sk_allocation = GFP_ATOMIC;
+
+       spin_lock_bh(&sk->sk_lock.slock);
+       err = kevent_socket_enqueue(k);
+       spin_unlock_bh(&sk->sk_lock.slock);
+       if (err)
+               goto err_out_put_pages;
+
+       fput_light(file, fput_needed);
+
+       return err;
+
+err_out_put_pages:
+       for (i=0; i<num; ++i)
+               page_cache_release(page[i]);
+err_out_free:
+       kfree(page);
+err_out_fput:
+       fput_light(file, fput_needed);
+
+       return err;
+}
+
+static int kevent_naio_dequeue(struct kevent *k)
+{
+       int err, i, num;
+       struct page **page = k->priv;
+
+       num = k->event.ret_data[0];
+
+       err = kevent_socket_dequeue(k);
+
+       for (i=0; i<num; ++i)
+               page_cache_release(page[i]);
+
+       kfree(k->priv);
+       k->priv = NULL;
+
+       return err;
+}
+
+static int kevent_naio_callback(struct kevent *k)
+{
+       struct inode *inode = k->st->origin;
+       struct sock *sk = SOCKET_I(inode)->sk;
+       unsigned int size = k->event.id.raw[1];
+       unsigned int off = k->event.ret_data[1];
+       struct page **pages = k->priv, *page;
+       int ready = 0, num = off/PAGE_SIZE, err = 0, send = 0;
+       void *ptr, *optr;
+       unsigned int len;
+
+       if (!sock_flag(sk, SOCK_ASYNC))
+               return -1;
+
+       if (k->event.event & KEVENT_SOCKET_SEND)
+               send = 1;
+       else if (!(k->event.event & KEVENT_SOCKET_RECV))
+               return -EINVAL;
+
+       /*
+        * sk_prot->async_*() can return either number of bytes processed,
+        * or negative error value, or zero if socket is closed.
+        */
+
+       if (!send) {
+               page = pages[num];
+
+               optr = ptr = kmap_atomic(page, KM_IRQ0);
+               if (!ptr)
+                       return -ENOMEM;
+
+               ptr += off % PAGE_SIZE;
+               len = min_t(unsigned int, PAGE_SIZE - (ptr - optr), size);
+
+               err = sk->sk_prot->async_recv(sk, ptr, len);
+
+               kunmap_atomic(optr, KM_IRQ0);
+       } else {
+               len = size;
+               err = sk->sk_prot->async_send(sk, pages, off, size);
+       }
+
+       if (err > 0) {
+               num++;
+               size -= err;
+               off += err;
+       }
+
+       k->event.ret_data[1] = off;
+       k->event.id.raw[1] = size;
+
+       if (err == 0 || (err < 0 && err != -EAGAIN))
+               ready = -1;
+
+       if (!size)
+               ready = 1;
+#if 0
+       printk("%s: sk=%p, k=%p, size=%4u, off=%4u, err=%3d, ready=%1d.\n",
+                       __func__, sk, k, size, off, err, ready);
+#endif
+
+       return ready;
+}
+
+int kevent_init_naio(struct kevent *k)
+{
+       k->enqueue = &kevent_naio_enqueue;
+       k->dequeue = &kevent_naio_dequeue;
+       k->callback = &kevent_naio_callback;
+       return 0;
+}
diff --git a/kernel/kevent/kevent_socket.c b/kernel/kevent/kevent_socket.c
new file mode 100644
index 0000000..c230aaa
--- /dev/null
+++ b/kernel/kevent/kevent_socket.c
@@ -0,0 +1,125 @@
+/*
+ *     kevent_socket.c
+ * 
+ * 2006 Copyright (c) Evgeniy Polyakov <[EMAIL PROTECTED]>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/timer.h>
+#include <linux/file.h>
+#include <linux/tcp.h>
+#include <linux/kevent.h>
+
+#include <net/sock.h>
+#include <net/request_sock.h>
+#include <net/inet_connection_sock.h>
+
+static int kevent_socket_callback(struct kevent *k)
+{
+       struct inode *inode = k->st->origin;
+       struct sock *sk = SOCKET_I(inode)->sk;
+       int rmem;
+       
+       if (k->event.event & KEVENT_SOCKET_RECV) {
+               int ret = 0;
+               
+               if ((rmem = atomic_read(&sk->sk_rmem_alloc)) > 0 || 
+                               !skb_queue_empty(&sk->sk_receive_queue))
+                       ret = 1;
+               if (sk->sk_shutdown & RCV_SHUTDOWN)
+                       ret = 1;
+               if (ret)
+                       return ret;
+       }
+       if ((k->event.event & KEVENT_SOCKET_ACCEPT) && 
+               (!reqsk_queue_empty(&inet_csk(sk)->icsk_accept_queue) || 
+                       
reqsk_queue_len_young(&inet_csk(sk)->icsk_accept_queue))) {
+               k->event.ret_data[1] = 
reqsk_queue_len(&inet_csk(sk)->icsk_accept_queue);
+               return 1;
+       }
+
+       return 0;
+}
+
+int kevent_socket_enqueue(struct kevent *k)
+{
+       struct file *file;
+       struct inode *inode;
+       int err, fput_needed;
+
+       file = fget_light(k->event.id.raw[0], &fput_needed);
+       if (!file)
+               return -ENODEV;
+
+       err = -EINVAL;
+       if (!file->f_dentry || !file->f_dentry->d_inode)
+               goto err_out_fput;
+
+       inode = igrab(file->f_dentry->d_inode);
+       if (!inode)
+               goto err_out_fput;
+
+       err = kevent_storage_enqueue(&inode->st, k);
+       if (err)
+               goto err_out_iput;
+
+       err = k->callback(k);
+       if (err)
+               goto err_out_dequeue;
+
+       fput_light(file, fput_needed);
+       return err;
+
+err_out_dequeue:
+       kevent_storage_dequeue(k->st, k);
+err_out_iput:
+       iput(inode);
+err_out_fput:
+       fput_light(file, fput_needed);
+       return err;
+}
+
+int kevent_socket_dequeue(struct kevent *k)
+{
+       struct inode *inode = k->st->origin;
+
+       kevent_storage_dequeue(k->st, k);
+       iput(inode);
+
+       return 0;
+}
+
+int kevent_init_socket(struct kevent *k)
+{
+       k->enqueue = &kevent_socket_enqueue;
+       k->dequeue = &kevent_socket_dequeue;
+       k->callback = &kevent_socket_callback;
+       return 0;
+}
+
+void kevent_socket_notify(struct sock *sk, u32 event)
+{
+       if (sk->sk_socket && !test_and_set_bit(SOCK_ASYNC_INUSE, 
&sk->sk_flags)) {
+               kevent_storage_ready(&SOCK_INODE(sk->sk_socket)->st, NULL, 
event);
+               sock_reset_flag(sk, SOCK_ASYNC_INUSE);
+       }
+}
diff --git a/net/core/datagram.c b/net/core/datagram.c
index aecddcc..493245b 100644
--- a/net/core/datagram.c
+++ b/net/core/datagram.c
@@ -236,6 +236,60 @@ void skb_kill_datagram(struct sock *sk, 
 EXPORT_SYMBOL(skb_kill_datagram);
 
 /**
+ *     skb_copy_datagram - Copy a datagram.
+ *     @skb: buffer to copy
+ *     @offset: offset in the buffer to start copying from
+ *     @to: pointer to copy to
+ *     @len: amount of data to copy from buffer to iovec
+ */
+int skb_copy_datagram(const struct sk_buff *skb, int offset,
+                           void *to, int len)
+{
+       int i, fraglen, end = 0;
+       struct sk_buff *next = skb_shinfo(skb)->frag_list;
+
+       if (!len)
+               return 0;
+
+next_skb:
+       fraglen = skb_headlen(skb);
+       i = -1;
+
+       while (1) {
+               int start = end;
+
+               if ((end += fraglen) > offset) {
+                       int copy = end - offset, o = offset - start;
+
+                       if (copy > len)
+                               copy = len;
+                       if (i == -1)
+                               memcpy(to, skb->data + o, copy);
+                       else {
+                               skb_frag_t *frag = &skb_shinfo(skb)->frags[i];
+                               struct page *page = frag->page;
+                               void *p = kmap(page) + frag->page_offset + o;
+                               memcpy(to, p, copy);
+                               kunmap(page);
+                       }
+                       if (!(len -= copy))
+                               return 0;
+                       offset += copy;
+               }
+               if (++i >= skb_shinfo(skb)->nr_frags)
+                       break;
+               fraglen = skb_shinfo(skb)->frags[i].size;
+       }
+       if (next) {
+               skb = next;
+               BUG_ON(skb_shinfo(skb)->frag_list);
+               next = skb->next;
+               goto next_skb;
+       }
+       return -EFAULT;
+}
+
+/**
  *     skb_copy_datagram_iovec - Copy a datagram to an iovec.
  *     @skb: buffer to copy
  *     @offset: offset in the buffer to start copying from
@@ -530,6 +584,7 @@ unsigned int datagram_poll(struct file *
 
 EXPORT_SYMBOL(datagram_poll);
 EXPORT_SYMBOL(skb_copy_and_csum_datagram_iovec);
+EXPORT_SYMBOL(skb_copy_datagram);
 EXPORT_SYMBOL(skb_copy_datagram_iovec);
 EXPORT_SYMBOL(skb_free_datagram);
 EXPORT_SYMBOL(skb_recv_datagram);
diff --git a/net/core/sock.c b/net/core/sock.c
index 5d820c3..3345048 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -564,6 +564,16 @@ #endif
                        spin_unlock_bh(&sk->sk_lock.slock);
                        ret = -ENONET;
                        break;
+#ifdef CONFIG_KEVENT_SOCKET
+               case SO_ASYNC_SOCK:
+                       spin_lock_bh(&sk->sk_lock.slock);
+                       if (valbool)
+                               sock_set_flag(sk, SOCK_ASYNC);
+                       else
+                               sock_reset_flag(sk, SOCK_ASYNC);
+                       spin_unlock_bh(&sk->sk_lock.slock);
+                       break;
+#endif
 
                /* We implement the SO_SNDLOWAT etc to
                   not be settable (1003.1g 5.3) */
@@ -1313,6 +1323,7 @@ static void sock_def_wakeup(struct sock 
        if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
                wake_up_interruptible_all(sk->sk_sleep);
        read_unlock(&sk->sk_callback_lock);
+       kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
 }
 
 static void sock_def_error_report(struct sock *sk)
@@ -1322,6 +1333,7 @@ static void sock_def_error_report(struct
                wake_up_interruptible(sk->sk_sleep);
        sk_wake_async(sk,0,POLL_ERR); 
        read_unlock(&sk->sk_callback_lock);
+       kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
 }
 
 static void sock_def_readable(struct sock *sk, int len)
@@ -1331,6 +1343,7 @@ static void sock_def_readable(struct soc
                wake_up_interruptible(sk->sk_sleep);
        sk_wake_async(sk,1,POLL_IN);
        read_unlock(&sk->sk_callback_lock);
+       kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
 }
 
 static void sock_def_write_space(struct sock *sk)
@@ -1350,6 +1363,7 @@ static void sock_def_write_space(struct 
        }
 
        read_unlock(&sk->sk_callback_lock);
+       kevent_socket_notify(sk, KEVENT_SOCKET_SEND|KEVENT_SOCKET_RECV);
 }
 
 static void sock_def_destruct(struct sock *sk)
@@ -1454,8 +1468,10 @@ void fastcall release_sock(struct sock *
        if (sk->sk_backlog.tail)
                __release_sock(sk);
        sk->sk_lock.owner = NULL;
-        if (waitqueue_active(&(sk->sk_lock.wq)))
+        if (waitqueue_active(&(sk->sk_lock.wq))) {
                wake_up(&(sk->sk_lock.wq));
+               kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
+       }
        spin_unlock_bh(&(sk->sk_lock.slock));
 }
 EXPORT_SYMBOL(release_sock);
diff --git a/net/core/stream.c b/net/core/stream.c
index e948969..91e2e07 100644
--- a/net/core/stream.c
+++ b/net/core/stream.c
@@ -36,6 +36,7 @@ void sk_stream_write_space(struct sock *
                        wake_up_interruptible(sk->sk_sleep);
                if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
                        sock_wake_async(sock, 2, POLL_OUT);
+               kevent_socket_notify(sk, KEVENT_SOCKET_SEND|KEVENT_SOCKET_RECV);
        }
 }
 
diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c
index 74998f2..403d33e 100644
--- a/net/ipv4/tcp.c
+++ b/net/ipv4/tcp.c
@@ -206,6 +206,7 @@
  *                                     lingertime == 0 (RFC 793 ABORT Call)
  *     Hirokazu Takahashi      :       Use copy_from_user() instead of
  *                                     csum_and_copy_from_user() if possible.
+ *     Evgeniy Polyakov        :       Network asynchronous IO.
  *
  *             This program is free software; you can redistribute it and/or
  *             modify it under the terms of the GNU General Public License
@@ -1085,6 +1086,275 @@ int tcp_read_sock(struct sock *sk, read_
 }
 
 /*
+ * Must be called with locked sock.
+ */
+int tcp_async_send(struct sock *sk, struct page **pages, unsigned int poffset, 
size_t len)
+{
+       struct tcp_sock *tp = tcp_sk(sk);
+       int mss_now, size_goal;
+       int err = -EAGAIN;
+       ssize_t copied;
+
+       /* Wait for a connection to finish. */
+       if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
+               goto out_err;
+
+       clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+       mss_now = tcp_current_mss(sk, 1);
+       size_goal = tp->xmit_size_goal;
+       copied = 0;
+
+       err = -EPIPE;
+       if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN) || sock_flag(sk, 
SOCK_DONE) ||
+                       (sk->sk_state == TCP_CLOSE) || 
(atomic_read(&sk->sk_refcnt) == 1))
+               goto do_error;
+
+       while (len > 0) {
+               struct sk_buff *skb = sk->sk_write_queue.prev;
+               struct page *page = pages[poffset / PAGE_SIZE];
+               int copy, i, can_coalesce;
+               int offset = poffset % PAGE_SIZE;
+               int size = min_t(size_t, len, PAGE_SIZE - offset);
+
+               if (!sk->sk_send_head || (copy = size_goal - skb->len) <= 0) {
+new_segment:
+                       if (!sk_stream_memory_free(sk))
+                               goto wait_for_sndbuf;
+
+                       skb = sk_stream_alloc_pskb(sk, 0, 0,
+                                                  sk->sk_allocation);
+                       if (!skb)
+                               goto wait_for_memory;
+
+                       skb_entail(sk, tp, skb);
+                       copy = size_goal;
+               }
+
+               if (copy > size)
+                       copy = size;
+
+               i = skb_shinfo(skb)->nr_frags;
+               can_coalesce = skb_can_coalesce(skb, i, page, offset);
+               if (!can_coalesce && i >= MAX_SKB_FRAGS) {
+                       tcp_mark_push(tp, skb);
+                       goto new_segment;
+               }
+               if (!sk_stream_wmem_schedule(sk, copy))
+                       goto wait_for_memory;
+               
+               if (can_coalesce) {
+                       skb_shinfo(skb)->frags[i - 1].size += copy;
+               } else {
+                       get_page(page);
+                       skb_fill_page_desc(skb, i, page, offset, copy);
+               }
+
+               skb->len += copy;
+               skb->data_len += copy;
+               skb->truesize += copy;
+               sk->sk_wmem_queued += copy;
+               sk->sk_forward_alloc -= copy;
+               skb->ip_summed = CHECKSUM_HW;
+               tp->write_seq += copy;
+               TCP_SKB_CB(skb)->end_seq += copy;
+               skb_shinfo(skb)->tso_segs = 0;
+
+               if (!copied)
+                       TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;
+
+               copied += copy;
+               poffset += copy;
+               if (!(len -= copy))
+                       goto out;
+
+               if (skb->len < mss_now)
+                       continue;
+
+               if (forced_push(tp)) {
+                       tcp_mark_push(tp, skb);
+                       __tcp_push_pending_frames(sk, tp, mss_now, 
TCP_NAGLE_PUSH);
+               } else if (skb == sk->sk_send_head)
+                       tcp_push_one(sk, mss_now);
+               continue;
+
+wait_for_sndbuf:
+               set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+wait_for_memory:
+               if (copied)
+                       tcp_push(sk, tp, 0, mss_now, TCP_NAGLE_PUSH);
+
+               err = -EAGAIN;
+               goto do_error;
+       }
+
+out:
+       if (copied)
+               tcp_push(sk, tp, 0, mss_now, tp->nonagle);
+       return copied;
+
+do_error:
+       if (copied)
+               goto out;
+out_err:
+       return sk_stream_error(sk, 0, err);
+}
+
+/*
+ * Must be called with locked sock.
+ */
+int tcp_async_recv(struct sock *sk, void *dst, size_t len)
+{
+       struct tcp_sock *tp = tcp_sk(sk);
+       int copied = 0;
+       u32 *seq;
+       unsigned long used;
+       int err;
+       int target;             /* Read at least this many bytes */
+
+       TCP_CHECK_TIMER(sk);
+
+       err = -ENOTCONN;
+       if (sk->sk_state == TCP_LISTEN)
+               goto out;
+
+       seq = &tp->copied_seq;
+
+       target = sock_rcvlowat(sk, 0, len);
+
+       do {
+               struct sk_buff *skb;
+               u32 offset;
+
+               /* Are we at urgent data? Stop if we have read anything or have 
SIGURG pending. */
+               if (tp->urg_data && tp->urg_seq == *seq) {
+                       if (copied)
+                               break;
+               }
+
+               /* Next get a buffer. */
+
+               skb = skb_peek(&sk->sk_receive_queue);
+               do {
+                       if (!skb)
+                               break;
+
+                       /* Now that we have two receive queues this
+                        * shouldn't happen.
+                        */
+                       if (before(*seq, TCP_SKB_CB(skb)->seq)) {
+                               printk(KERN_INFO "async_recv bug: copied %X "
+                                      "seq %X\n", *seq, TCP_SKB_CB(skb)->seq);
+                               break;
+                       }
+                       offset = *seq - TCP_SKB_CB(skb)->seq;
+                       if (skb->h.th->syn)
+                               offset--;
+                       if (offset < skb->len)
+                               goto found_ok_skb;
+                       if (skb->h.th->fin)
+                               goto found_fin_ok;
+                       skb = skb->next;
+               } while (skb != (struct sk_buff *)&sk->sk_receive_queue);
+
+               if (copied)
+                       break;
+
+               if (sock_flag(sk, SOCK_DONE))
+                       break;
+
+               if (sk->sk_err) {
+                       copied = sock_error(sk);
+                       break;
+               }
+
+               if (sk->sk_shutdown & RCV_SHUTDOWN)
+                       break;
+
+               if (sk->sk_state == TCP_CLOSE) {
+                       if (!sock_flag(sk, SOCK_DONE)) {
+                               /* This occurs when user tries to read
+                                * from never connected socket.
+                                */
+                               copied = -ENOTCONN;
+                               break;
+                       }
+                       break;
+               }
+
+               copied = -EAGAIN;
+               break;
+
+       found_ok_skb:
+               /* Ok so how much can we use? */
+               used = skb->len - offset;
+               if (len < used)
+                       used = len;
+
+               /* Do we have urgent data here? */
+               if (tp->urg_data) {
+                       u32 urg_offset = tp->urg_seq - *seq;
+                       if (urg_offset < used) {
+                               if (!urg_offset) {
+                                       if (!sock_flag(sk, SOCK_URGINLINE)) {
+                                               ++*seq;
+                                               offset++;
+                                               used--;
+                                               if (!used)
+                                                       goto skip_copy;
+                                       }
+                               } else
+                                       used = urg_offset;
+                       }
+               }
+
+               err = skb_copy_datagram(skb, offset, dst, used);
+               if (err) {
+                       /* Exception. Bailout! */
+                       if (!copied)
+                               copied = -EFAULT;
+                       break;
+               }
+
+               *seq += used;
+               copied += used;
+               len -= used;
+               dst += used;
+
+               tcp_rcv_space_adjust(sk);
+
+skip_copy:
+               if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) {
+                       tp->urg_data = 0;
+                       tcp_fast_path_check(sk, tp);
+               }
+               if (used + offset < skb->len)
+                       continue;
+
+               if (skb->h.th->fin)
+                       goto found_fin_ok;
+               sk_eat_skb(sk, skb);
+               continue;
+
+       found_fin_ok:
+               /* Process the FIN. */
+               ++*seq;
+               sk_eat_skb(sk, skb);
+               break;
+       } while (len > 0);
+
+       /* Clean up data we have read: This will do ACK frames. */
+       cleanup_rbuf(sk, copied);
+
+       TCP_CHECK_TIMER(sk);
+       return copied;
+
+out:
+       TCP_CHECK_TIMER(sk);
+       return err;
+}
+
+/*
  *     This routine copies from a sock struct into the user buffer.
  *
  *     Technical note: in 2.3 we work on _locked_ socket, so that
@@ -2259,6 +2529,8 @@ EXPORT_SYMBOL(tcp_getsockopt);
 EXPORT_SYMBOL(tcp_ioctl);
 EXPORT_SYMBOL(tcp_poll);
 EXPORT_SYMBOL(tcp_read_sock);
+EXPORT_SYMBOL(tcp_async_recv);
+EXPORT_SYMBOL(tcp_async_send);
 EXPORT_SYMBOL(tcp_recvmsg);
 EXPORT_SYMBOL(tcp_sendmsg);
 EXPORT_SYMBOL(tcp_sendpage);
diff --git a/net/ipv4/tcp_input.c b/net/ipv4/tcp_input.c
index e08245b..5655b1e 100644
--- a/net/ipv4/tcp_input.c
+++ b/net/ipv4/tcp_input.c
@@ -3113,6 +3113,7 @@ static void tcp_ofo_queue(struct sock *s
 
                __skb_unlink(skb, &tp->out_of_order_queue);
                __skb_queue_tail(&sk->sk_receive_queue, skb);
+               kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
                tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq;
                if(skb->h.th->fin)
                        tcp_fin(skb, sk, skb->h.th);
@@ -3956,7 +3957,8 @@ int tcp_rcv_established(struct sock *sk,
                        int copied_early = 0;
 
                        if (tp->copied_seq == tp->rcv_nxt &&
-                           len - tcp_header_len <= tp->ucopy.len) {
+                           len - tcp_header_len <= tp->ucopy.len &&
+                           !sock_async(sk)) {
 #ifdef CONFIG_NET_DMA
                                if (tcp_dma_try_early_copy(sk, skb, 
tcp_header_len)) {
                                        copied_early = 1;
diff --git a/net/ipv4/tcp_ipv4.c b/net/ipv4/tcp_ipv4.c
index 25ecc6e..05d7086 100644
--- a/net/ipv4/tcp_ipv4.c
+++ b/net/ipv4/tcp_ipv4.c
@@ -62,6 +62,7 @@ #include <linux/cache.h>
 #include <linux/jhash.h>
 #include <linux/init.h>
 #include <linux/times.h>
+#include <linux/kevent.h>
 
 #include <net/icmp.h>
 #include <net/inet_hashtables.h>
@@ -850,6 +851,7 @@ #endif
                reqsk_free(req);
        } else {
                inet_csk_reqsk_queue_hash_add(sk, req, TCP_TIMEOUT_INIT);
+               kevent_socket_notify(sk, KEVENT_SOCKET_ACCEPT);
        }
        return 0;
 
@@ -1089,24 +1091,30 @@ process:
 
        skb->dev = NULL;
 
-       bh_lock_sock(sk);
        ret = 0;
-       if (!sock_owned_by_user(sk)) {
+       if (sock_async(sk)) {
+               spin_lock_bh(&sk->sk_lock.slock);
+               ret = tcp_v4_do_rcv(sk, skb);
+               spin_unlock_bh(&sk->sk_lock.slock);
+       } else {
+               bh_lock_sock(sk);
+               if (!sock_owned_by_user(sk)) {
 #ifdef CONFIG_NET_DMA
-               struct tcp_sock *tp = tcp_sk(sk);
-               if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list)
-                       tp->ucopy.dma_chan = get_softnet_dma();
-               if (tp->ucopy.dma_chan)
-                       ret = tcp_v4_do_rcv(sk, skb);
-               else
+                       struct tcp_sock *tp = tcp_sk(sk);
+                       if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list)
+                               tp->ucopy.dma_chan = get_softnet_dma();
+                       if (tp->ucopy.dma_chan)
+                               ret = tcp_v4_do_rcv(sk, skb);
+                       else
 #endif
-               {
-                       if (!tcp_prequeue(sk, skb))
-                       ret = tcp_v4_do_rcv(sk, skb);
-               }
-       } else
-               sk_add_backlog(sk, skb);
-       bh_unlock_sock(sk);
+                       {
+                               if (!tcp_prequeue(sk, skb))
+                               ret = tcp_v4_do_rcv(sk, skb);
+                       }
+               } else
+                       sk_add_backlog(sk, skb);
+               bh_unlock_sock(sk);
+       }
 
        sock_put(sk);
 
@@ -1830,6 +1838,8 @@ struct proto tcp_prot = {
        .getsockopt             = tcp_getsockopt,
        .sendmsg                = tcp_sendmsg,
        .recvmsg                = tcp_recvmsg,
+       .async_recv             = tcp_async_recv,
+       .async_send             = tcp_async_send,
        .backlog_rcv            = tcp_v4_do_rcv,
        .hash                   = tcp_v4_hash,
        .unhash                 = tcp_unhash,

diff --git a/net/ipv6/tcp_ipv6.c b/net/ipv6/tcp_ipv6.c
index a50eb30..e27e231 100644
--- a/net/ipv6/tcp_ipv6.c
+++ b/net/ipv6/tcp_ipv6.c
@@ -1215,22 +1215,28 @@ process:
 
        skb->dev = NULL;
 
-       bh_lock_sock(sk);
        ret = 0;
-       if (!sock_owned_by_user(sk)) {
+       if (sock_async(sk)) {
+               spin_lock_bh(&sk->sk_lock.slock);
+               ret = tcp_v4_do_rcv(sk, skb);
+               spin_unlock_bh(&sk->sk_lock.slock);
+       } else {
+               bh_lock_sock(sk);
+               if (!sock_owned_by_user(sk)) {
 #ifdef CONFIG_NET_DMA
-                struct tcp_sock *tp = tcp_sk(sk);
-                if (tp->ucopy.dma_chan)
-                        ret = tcp_v6_do_rcv(sk, skb);
-                else
-#endif
-               {
-                       if (!tcp_prequeue(sk, skb))
+                       struct tcp_sock *tp = tcp_sk(sk);
+                       if (tp->ucopy.dma_chan)
                                ret = tcp_v6_do_rcv(sk, skb);
-               }
-       } else
-               sk_add_backlog(sk, skb);
-       bh_unlock_sock(sk);
+                       else
+#endif
+                       {
+                               if (!tcp_prequeue(sk, skb))
+                                       ret = tcp_v6_do_rcv(sk, skb);
+                       }
+               } else
+                       sk_add_backlog(sk, skb);
+               bh_unlock_sock(sk);
+       }
 
        sock_put(sk);
        return ret ? -1 : 0;
@@ -1580,6 +1586,8 @@ struct proto tcpv6_prot = {
        .getsockopt             = tcp_getsockopt,
        .sendmsg                = tcp_sendmsg,
        .recvmsg                = tcp_recvmsg,
+       .async_recv             = tcp_async_recv,
+       .async_send             = tcp_async_send,
        .backlog_rcv            = tcp_v6_do_rcv,
        .hash                   = tcp_v6_hash,
        .unhash                 = tcp_unhash,
-- 
        Evgeniy Polyakov
-
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to [EMAIL PROTECTED]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to