* Evgeniy Polyakov <[EMAIL PROTECTED]> wrote: > I will use Ingo's evserver_threadlet server as plong as evserver_epoll > (with fixed closing) and evserver_kevent.c.
please also try evserver_epoll_threadlet.c that i've attached below - it uses epoll as the main event mechanism but does threadlets for request handling. This is a one step more intelligent threadlet queueing model than 'thousands of threads' - although obviously epoll alone should do well too with this trivial workload. Ingo ----------------------------> #include <sys/types.h> #include <sys/socket.h> #include <sys/resource.h> #include <sys/wait.h> #include <sys/ioctl.h> #include <sys/stat.h> #include <sys/time.h> #include <sys/poll.h> #include <sys/sendfile.h> #include <sys/epoll.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <time.h> #include <ctype.h> #include <netdb.h> #define DEBUG 0 #include "syslet.h" #include "sys.h" #include "threadlet.h" struct request { struct request *next_free; /* * The threadlet stack is part of the request structure * and is thus reused as threadlets complete: */ unsigned long threadlet_stack; /* * These are all the request-specific parameters: */ long sock; }; /* * Freelist to recycle requests: */ static struct request *freelist; /* * Allocate a request and set up its syslet atoms: */ static struct request *alloc_req(void) { struct request *req; /* * Occasionally we have to refill the new-thread stack * entry: */ if (!async_head.new_thread_stack) { async_head.new_thread_stack = thread_stack_alloc(); pr("allocated new thread stack: %08lx\n", async_head.new_thread_stack); } if (freelist) { req = freelist; pr("reusing req %p, threadlet stack %08lx\n", req, req->threadlet_stack); freelist = freelist->next_free; req->next_free = NULL; return req; } req = calloc(1, sizeof(struct request)); pr("allocated req %p\n", req); req->threadlet_stack = thread_stack_alloc(); pr("allocated thread stack %08lx\n", req->threadlet_stack); return req; } /* * Check whether there are any completions queued for user-space * to finish up: */ static unsigned long complete(void) { unsigned long completed = 0; struct request *req; for (;;) { req = (void *)completion_ring[async_head.user_ring_idx]; if (!req) return completed; completed++; pr("completed req %p (threadlet stack %08lx)\n", req, req->threadlet_stack); req->next_free = freelist; freelist = req; /* * Clear the completion pointer. To make sure the * kernel never stomps upon still unhandled completions * in the ring the kernel only writes to a NULL entry, * so user-space has to clear it explicitly: */ completion_ring[async_head.user_ring_idx] = NULL; async_head.user_ring_idx++; if (async_head.user_ring_idx == MAX_PENDING) async_head.user_ring_idx = 0; } } static unsigned int pending_requests; /* * Handle a request that has just been submitted (either it has * already been executed, or we have to account it as pending): */ static void handle_submitted_request(struct request *req, long done) { unsigned int nr; if (done) { /* * This is the cached case - free the request: */ pr("cache completed req %p (threadlet stack %08lx)\n", req, req->threadlet_stack); req->next_free = freelist; freelist = req; return; } /* * 'cachemiss' case - the syslet is not finished * yet. We will be notified about its completion * via the completion ring: */ assert(pending_requests < MAX_PENDING-1); pending_requests++; pr("req %p is pending. %d reqs pending.\n", req, pending_requests); /* * Attempt to complete requests - this is a fast * check if there's no completions: */ nr = complete(); pending_requests -= nr; /* * If the ring is full then wait a bit: */ while (pending_requests == MAX_PENDING-1) { pr("sys_async_wait()"); /* * Wait for 4 events - to batch things a bit: */ sys_async_wait(4, async_head.user_ring_idx, &async_head); nr = complete(); pending_requests -= nr; pr("after wait: completed %d requests - still pending: %d\n", nr, pending_requests); } } #include <linux/types.h> //#define ulog(f, a...) fprintf(stderr, f, ##a) #define ulog(f, a...) #define ulog_err(f, a...) printf(f ": %s [%d].\n", ##a, strerror(errno), errno) static int kevent_ctl_fd, main_server_s; static void usage(char *p) { ulog("Usage: %s -a addr -p port -f kevent_path -t timeout -w wait_num\n", p); } static int evtest_server_init(char *addr, unsigned short port) { struct hostent *h; int s, on; struct sockaddr_in sa; if (!addr) { ulog("%s: Bind address cannot be NULL.\n", __func__); return -1; } h = gethostbyname(addr); if (!h) { ulog_err("%s: Failed to get address of %s.\n", __func__, addr); return -1; } s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (s == -1) { ulog_err("%s: Failed to create server socket", __func__); return -1; } fcntl(s, F_SETFL, O_NONBLOCK); memcpy(&(sa.sin_addr.s_addr), h->h_addr_list[0], 4); sa.sin_port = htons(port); sa.sin_family = AF_INET; on = 1; setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, 4); if (bind(s, (struct sockaddr *)&sa, sizeof(struct sockaddr_in)) == -1) { ulog_err("%s: Failed to bind to %s", __func__, addr); close(s); return -1; } if (listen(s, 30000) == -1) { ulog_err("%s: Failed to listen on %s", __func__, addr); close(s); return -1; } return s; } static int evtest_kevent_remove(int fd) { int err; struct epoll_event event; event.events = EPOLLIN | EPOLLET; event.data.fd = fd; err = epoll_ctl(kevent_ctl_fd, EPOLL_CTL_DEL, fd, &event); if (err < 0) { ulog_err("Failed to perform control REMOVE operation"); return err; } return err; } static int evtest_kevent_init(int fd) { int err; struct timeval tm; struct epoll_event event; event.events = EPOLLIN | EPOLLET; event.data.fd = fd; err = epoll_ctl(kevent_ctl_fd, EPOLL_CTL_ADD, fd, &event); gettimeofday(&tm, NULL); ulog("%08lu:%06lu: fd=%3d, err=%1d.\n", tm.tv_sec, tm.tv_usec, fd, err); if (err < 0) { ulog_err("Failed to perform control ADD operation: fd=%d, events=%08x", fd, event.events); return err; } return err; } static long handle_request(void *__req) { struct request *req = __req; int s = req->sock, err, fd; off_t offset; int count; char path[] = "/tmp/index.html"; char buf[4096]; struct timeval tm; count = 40960; offset = 0; err = recv(s, buf, sizeof(buf), 0); if (err < 0) { ulog_err("Failed to read data from s=%d", s); goto err_out_remove; } if (err == 0) { gettimeofday(&tm, NULL); ulog("%08lu:%06lu: Client exited: fd=%d.\n", tm.tv_sec, tm.tv_usec, s); goto err_out_remove; } fd = open(path, O_RDONLY); if (fd == -1) { ulog_err("Failed to open '%s'", path); err = -1; goto err_out_remove; } #if 0 do { err = read(fd, buf, sizeof(buf)); if (err <= 0) break; err = send(s, buf, err, 0); if (err <= 0) break; } while (1); #endif err = sendfile(s, fd, &offset, count); { int on = 0; setsockopt(s, SOL_TCP, TCP_CORK, &on, sizeof(on)); } close(fd); if (err < 0) { ulog_err("Failed send %d bytes: fd=%d.\n", count, s); goto err_out_remove; } gettimeofday(&tm, NULL); ulog("%08lu:%06lu: %d bytes has been sent to client fd=%d.\n", tm.tv_sec, tm.tv_usec, err, s); close(s); return complete_threadlet_fn(req, &async_head); err_out_remove: evtest_kevent_remove(s); close(s); return complete_threadlet_fn(req, &async_head); } static int evtest_callback_client(int sock) { struct request *req; long done; req = alloc_req(); if (!req) { printf("no req\n"); evtest_kevent_remove(sock); return -ENOMEM; } req->sock = sock; done = threadlet_exec(handle_request, req, req->threadlet_stack, &async_head); handle_submitted_request(req, done); return 0; } static int evtest_callback_main(int s) { int cs, err; struct sockaddr_in csa; socklen_t addrlen = sizeof(struct sockaddr_in); struct timeval tm; memset(&csa, 0, sizeof(csa)); if ((cs = accept(s, (struct sockaddr *)&csa, &addrlen)) == -1) { ulog_err("Failed to accept client"); return -1; } fcntl(cs, F_SETFL, O_NONBLOCK); gettimeofday(&tm, NULL); ulog("%08lu:%06lu: Accepted connect from %s:%d.\n", tm.tv_sec, tm.tv_usec, inet_ntoa(csa.sin_addr), ntohs(csa.sin_port)); err = evtest_kevent_init(cs); if (err < 0) { close(cs); return -1; } return 0; } static int evtest_kevent_wait(unsigned int timeout, unsigned int wait_num) { int num, err; struct timeval tm; struct epoll_event event[256]; int i; err = epoll_wait(kevent_ctl_fd, event, 256, -1); if (err < 0) { ulog_err("Failed to perform control operation"); return num; } gettimeofday(&tm, NULL); num = err; ulog("%08lu.%06lu: Wait: num=%d.\n", tm.tv_sec, tm.tv_usec, num); for (i=0; i<num; ++i) { if (event[i].data.fd == main_server_s) err = evtest_callback_main(event[i].data.fd); else err = evtest_callback_client(event[i].data.fd); } return err; } int main(int argc, char *argv[]) { int ch, err; char *addr; unsigned short port; unsigned int timeout, wait_num; addr = "0.0.0.0"; port = 8080; timeout = 1000; wait_num = 1; async_head_init(); while ((ch = getopt(argc, argv, "f:n:t:a:p:h")) > 0) { switch (ch) { case 't': timeout = atoi(optarg); break; case 'n': wait_num = atoi(optarg); break; case 'a': addr = optarg; break; case 'p': port = atoi(optarg); break; case 'f': break; default: usage(argv[0]); return -1; } } kevent_ctl_fd = epoll_create(10); if (kevent_ctl_fd == -1) { ulog_err("Failed to epoll descriptor"); return -1; } main_server_s = evtest_server_init(addr, port); if (main_server_s < 0) return main_server_s; err = evtest_kevent_init(main_server_s); if (err < 0) goto err_out_exit; while (1) { err = evtest_kevent_wait(timeout, wait_num); } err_out_exit: close(kevent_ctl_fd); async_head_exit(); return 0; } - To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to [EMAIL PROTECTED] More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/