* Evgeniy Polyakov <[EMAIL PROTECTED]> wrote:

> I will use Ingo's evserver_threadlet server as plong as evserver_epoll 
> (with fixed closing) and evserver_kevent.c.

please also try evserver_epoll_threadlet.c that i've attached below - it 
uses epoll as the main event mechanism but does threadlets for request 
handling.

This is a one step more intelligent threadlet queueing model than 
'thousands of threads' - although obviously epoll alone should do well 
too with this trivial workload.

        Ingo

---------------------------->
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/poll.h>
#include <sys/sendfile.h>
#include <sys/epoll.h>

#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <time.h>
#include <ctype.h>
#include <netdb.h>

#define DEBUG           0

#include "syslet.h"
#include "sys.h"
#include "threadlet.h"

struct request {
        struct request *next_free;
        /*
         * The threadlet stack is part of the request structure
         * and is thus reused as threadlets complete:
         */
        unsigned long threadlet_stack;

        /*
         * These are all the request-specific parameters:
         */
        long sock;
};

/*
 * Freelist to recycle requests:
 */
static struct request *freelist;

/*
 * Allocate a request and set up its syslet atoms:
 */
static struct request *alloc_req(void)
{
        struct request *req;

        /*
         * Occasionally we have to refill the new-thread stack
         * entry:
         */
        if (!async_head.new_thread_stack) {
                async_head.new_thread_stack = thread_stack_alloc();
                pr("allocated new thread stack: %08lx\n",
                        async_head.new_thread_stack);
        }

        if (freelist) {
                req = freelist;
                pr("reusing req %p, threadlet stack %08lx\n",
                        req, req->threadlet_stack);
                freelist = freelist->next_free;
                req->next_free = NULL;
                return req;
        }

        req = calloc(1, sizeof(struct request));
        pr("allocated req %p\n", req);
        req->threadlet_stack = thread_stack_alloc();
        pr("allocated thread stack %08lx\n", req->threadlet_stack);

        return req;
}

/*
 * Check whether there are any completions queued for user-space
 * to finish up:
 */
static unsigned long complete(void)
{
        unsigned long completed = 0;
        struct request *req;

        for (;;) {
                req = (void *)completion_ring[async_head.user_ring_idx];
                if (!req)
                        return completed;
                completed++;
                pr("completed req %p (threadlet stack %08lx)\n",
                        req, req->threadlet_stack);

                req->next_free = freelist;
                freelist = req;

                /*
                 * Clear the completion pointer. To make sure the
                 * kernel never stomps upon still unhandled completions
                 * in the ring the kernel only writes to a NULL entry,
                 * so user-space has to clear it explicitly:
                 */
                completion_ring[async_head.user_ring_idx] = NULL;
                async_head.user_ring_idx++;
                if (async_head.user_ring_idx == MAX_PENDING)
                        async_head.user_ring_idx = 0;
        }
}

static unsigned int pending_requests;

/*
 * Handle a request that has just been submitted (either it has
 * already been executed, or we have to account it as pending):
 */
static void handle_submitted_request(struct request *req, long done)
{
        unsigned int nr;

        if (done) {
                /*
                 * This is the cached case - free the request:
                 */
                pr("cache completed req %p (threadlet stack %08lx)\n",
                        req, req->threadlet_stack);
                req->next_free = freelist;
                freelist = req;
                return;
        }
        /*
         * 'cachemiss' case - the syslet is not finished
         * yet. We will be notified about its completion
         * via the completion ring:
         */
        assert(pending_requests < MAX_PENDING-1);

        pending_requests++;
        pr("req %p is pending. %d reqs pending.\n", req, pending_requests);
        /*
         * Attempt to complete requests - this is a fast
         * check if there's no completions:
         */
        nr = complete();
        pending_requests -= nr;

        /*
         * If the ring is full then wait a bit:
         */
        while (pending_requests == MAX_PENDING-1) {
                pr("sys_async_wait()");
                /*
                 * Wait for 4 events - to batch things a bit:
                 */
                sys_async_wait(4, async_head.user_ring_idx, &async_head);
                nr = complete();
                pending_requests -= nr;
                pr("after wait: completed %d requests - still pending: %d\n",
                        nr, pending_requests);
        }
}

#include <linux/types.h>

//#define ulog(f, a...) fprintf(stderr, f, ##a)
#define ulog(f, a...)
#define ulog_err(f, a...) printf(f ": %s [%d].\n", ##a, strerror(errno), errno)


static int kevent_ctl_fd, main_server_s;

static void usage(char *p)
{
        ulog("Usage: %s -a addr -p port -f kevent_path -t timeout -w 
wait_num\n", p);
}

static int evtest_server_init(char *addr, unsigned short port)
{
        struct hostent *h;
        int s, on;
        struct sockaddr_in sa;

        if (!addr) {
                ulog("%s: Bind address cannot be NULL.\n", __func__);
                return -1;
        }

        h = gethostbyname(addr);
        if (!h) {
                ulog_err("%s: Failed to get address of %s.\n", __func__, addr);
                return -1;
        }

        s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (s == -1) {
                ulog_err("%s: Failed to create server socket", __func__);
                return -1;
        }
        fcntl(s, F_SETFL, O_NONBLOCK);

        memcpy(&(sa.sin_addr.s_addr), h->h_addr_list[0], 4);
        sa.sin_port = htons(port);
        sa.sin_family = AF_INET;

        on = 1;
        setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, 4);

        if (bind(s, (struct sockaddr *)&sa, sizeof(struct sockaddr_in)) == -1) {
                ulog_err("%s: Failed to bind to %s", __func__, addr);
                close(s);
                return -1;
        }

        if (listen(s, 30000) == -1) {
                ulog_err("%s: Failed to listen on %s", __func__, addr);
                close(s);
                return -1;
        }

        return s;
}

static int evtest_kevent_remove(int fd)
{
        int err;
        struct epoll_event event;

        event.events = EPOLLIN | EPOLLET;
        event.data.fd = fd;

        err = epoll_ctl(kevent_ctl_fd, EPOLL_CTL_DEL, fd, &event);
        if (err < 0) {
                ulog_err("Failed to perform control REMOVE operation");
                return err;
        }

        return err;
}

static int evtest_kevent_init(int fd)
{
        int err;
        struct timeval tm;
        struct epoll_event event;

        event.events = EPOLLIN | EPOLLET;
        event.data.fd = fd;

        err = epoll_ctl(kevent_ctl_fd, EPOLL_CTL_ADD, fd, &event);
        gettimeofday(&tm, NULL);
        ulog("%08lu:%06lu: fd=%3d, err=%1d.\n", tm.tv_sec, tm.tv_usec, fd, err);
        if (err < 0) {
                ulog_err("Failed to perform control ADD operation: fd=%d, 
events=%08x", fd, event.events);
                return err;
        }

        return err;
}

static long handle_request(void *__req)
{
        struct request *req = __req;
        int s = req->sock, err, fd;
        off_t offset;
        int count;
        char path[] = "/tmp/index.html";
        char buf[4096];
        struct timeval tm;

        count = 40960;
        offset = 0;

        err = recv(s, buf, sizeof(buf), 0);
        if (err < 0) {
                ulog_err("Failed to read data from s=%d", s);
                goto err_out_remove;
        }
        if (err == 0) {
                gettimeofday(&tm, NULL);
                ulog("%08lu:%06lu: Client exited: fd=%d.\n", tm.tv_sec, 
tm.tv_usec, s);
                goto err_out_remove;
        }

        fd = open(path, O_RDONLY);
        if (fd == -1) {
                ulog_err("Failed to open '%s'", path);
                err = -1;
                goto err_out_remove;
        }
#if 0
        do {
                err = read(fd, buf, sizeof(buf));
                if (err <= 0)
                        break;
                err = send(s, buf, err, 0);
                if (err <= 0)
                        break;
        } while (1);
#endif
        err = sendfile(s, fd, &offset, count);
        {
                int on = 0;
                setsockopt(s, SOL_TCP, TCP_CORK, &on, sizeof(on));
        }

        close(fd);
        if (err < 0) {
                ulog_err("Failed send %d bytes: fd=%d.\n", count, s);
                goto err_out_remove;
        }

        gettimeofday(&tm, NULL);
        ulog("%08lu:%06lu: %d bytes has been sent to client fd=%d.\n", 
tm.tv_sec, tm.tv_usec, err, s);

        close(s);

        return complete_threadlet_fn(req, &async_head);

err_out_remove:
        evtest_kevent_remove(s);
        close(s);

        return complete_threadlet_fn(req, &async_head);
}

static int evtest_callback_client(int sock)
{
        struct request *req;
        long done;

        req = alloc_req();
        if (!req) {
                printf("no req\n");
                evtest_kevent_remove(sock);
                return -ENOMEM;
        }

        req->sock = sock;
        done = threadlet_exec(handle_request, req,
                        req->threadlet_stack, &async_head);

        handle_submitted_request(req, done);

        return 0;
}

static int evtest_callback_main(int s)
{
        int cs, err;
        struct sockaddr_in csa;
        socklen_t addrlen = sizeof(struct sockaddr_in);
        struct timeval tm;

        memset(&csa, 0, sizeof(csa));

        if ((cs = accept(s, (struct sockaddr *)&csa, &addrlen)) == -1) {
                ulog_err("Failed to accept client");
                return -1;
        }
        fcntl(cs, F_SETFL, O_NONBLOCK);

        gettimeofday(&tm, NULL);

        ulog("%08lu:%06lu: Accepted connect from %s:%d.\n",
                tm.tv_sec, tm.tv_usec,
                inet_ntoa(csa.sin_addr), ntohs(csa.sin_port));

        err = evtest_kevent_init(cs);
        if (err < 0) {
                close(cs);
                return -1;
        }

        return 0;
}

static int evtest_kevent_wait(unsigned int timeout, unsigned int wait_num)
{
        int num, err;
        struct timeval tm;
        struct epoll_event event[256];
        int i;

        err = epoll_wait(kevent_ctl_fd, event, 256, -1);
        if (err < 0) {
                ulog_err("Failed to perform control operation");
                return num;
        }

        gettimeofday(&tm, NULL);

        num = err;
        ulog("%08lu.%06lu: Wait: num=%d.\n", tm.tv_sec, tm.tv_usec, num);
        for (i=0; i<num; ++i) {
                if (event[i].data.fd == main_server_s)
                        err = evtest_callback_main(event[i].data.fd);
                else
                        err = evtest_callback_client(event[i].data.fd);
        }

        return err;
}

int main(int argc, char *argv[])
{
        int ch, err;
        char *addr;
        unsigned short port;
        unsigned int timeout, wait_num;

        addr = "0.0.0.0";
        port = 8080;
        timeout = 1000;
        wait_num = 1;

        async_head_init();

        while ((ch = getopt(argc, argv, "f:n:t:a:p:h")) > 0) {
                switch (ch) {
                        case 't':
                                timeout = atoi(optarg);
                                break;
                        case 'n':
                                wait_num = atoi(optarg);
                                break;
                        case 'a':
                                addr = optarg;
                                break;
                        case 'p':
                                port = atoi(optarg);
                                break;
                        case 'f':
                                break;
                        default:
                                usage(argv[0]);
                                return -1;
                }
        }

        kevent_ctl_fd = epoll_create(10);
        if (kevent_ctl_fd == -1) {
                ulog_err("Failed to epoll descriptor");
                return -1;
        }

        main_server_s = evtest_server_init(addr, port);
        if (main_server_s < 0)
                return main_server_s;

        err = evtest_kevent_init(main_server_s);
        if (err < 0)
                goto err_out_exit;

        while (1) {
                err = evtest_kevent_wait(timeout, wait_num);
        }

err_out_exit:
        close(kevent_ctl_fd);

        async_head_exit();

        return 0;
}
-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [EMAIL PROTECTED]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to