I am happy to see this e-mail because I can contribute some thing :-)...
This is exactly the same need for me and I write some code to do this. They are 
in attachment files.
The key to accomplish this goal is:
1. create new event loop for per worker thread, with a pipe and a extra ev_io 
to watch pipe's read event. This is done before the worker thread start, so the 
main thread(listen thread) can hold these in a struct in a list. It is 
important to know that pipe's read event is watched in the worker thread's 
event loop, not the main event loop.
2. once new connection accepted, the listen thread send the accepted socket fd 
on pipe, just using round robin to determine which thread struct(this contain 
the pipe) to use
3. the worker thread use a particular callback for the pipe read watcher. In 
this callback(dispatched_socket_arrived in example), create normal read/write 
watchers for socket fd just received on pipe. Just start the read watcher, not 
start the write watcher for this socket fd
4. so data send/received on this socket fd is processed on worker thread.
I hope this can help you.


在2013年01月31 22时31分,"Chris Herssens"<chris.herss...@gmail.com> 写道:


I want to use libev with multiple threads for the handling of tcp connections. 
What I want to is:

The main thread listen on incoming connections, accept the connections and 
forward the connection to a workerthread.

I have a pool of workerthreads. The number of threads depends on the number of 
cpu's. Each worker-thread has an event loop. The worker-thread listen if I can 
write on the tcp socket or if somethings available for reading.

I looked into the documentation of libev and I known this can be done with 
libev, but I can't find any example how I have to do that.
Does someone has an example? I think that I have to use the ev_loop_new() api, 
for the worker-threads and for the main thread I have to use the 
ev_default_loop() ?

Since I new in using libev I really need an example how to dispatch an incoming 
tcp connection to a worker-thread. I have started to implement it, but I don't 
get it work. In the main thread I start the default loop. And if I received new 
connection I create a watcher and use the ev_io_start with the loop that I 
create in the workerthread. It seems that the callback function of the watcher 
is not always triggered.  Also Is there a way to block the ev_loop if there no 
active watchers ?




regards.


#include <signal.h>
#include <stdarg.h>
#include <memory>
#include <cstring>
#include <set>
#include <boost/scoped_array.hpp>
#include <csf/logging.h>
#include "socket_server.h"
using namespace csf;

// the worker thread holding a event loop
struct socket_thread
{
    pthread_t         thread;
    struct ev_loop*   event_loop;
    struct ev_io*     notify_event;
    int               notify_receive_fd;
    int               notify_send_fd;
        server_socket*    svrsocket;
};

struct socket_listener
{
    int               socket_fd;
    struct ev_io      watcher;
    server_socket*    svrsocket;

    socket_listener*  next;
};

// the data object used to take socket fd from accept thread to worker thread
struct socket_connection_info
{
        int socket_fd;
        struct sockaddr_in addr;
};

// a server socket accepting client connection
struct server_socket
{
        server_socket(const char* name, socket_connection_factory* f);
        ~server_socket();
        bool create(const socket_settings& settings);
        bool create_worker_threads();
        bool dispatch_connection(const socket_connection_info& info);
        void cleanup();
        bool is_tcp() { return settings.transport == tcp_transport; };

        const char*                socket_name;
        socket_settings            settings;
        socket_connection_factory* factory;
        socket_listener*           listeners;
        socket_thread*             worker_threads;
        pthread_t                  dispatch_thread;
        struct ev_loop*            event_loop;
        int                        last_dispatched;

        server_socket*             next;
};

/////////////////////////////////////////////////////////////////////////////
socket_settings::socket_settings()
{
        strcpy(inter, "127.0.0.1");
    port             = 0;
        transport        = tcp_transport;
        max_conns        = 1024;
    num_threads      = 8;
    reqs_per_event   = 20;
    backlog          = 1024;
        read_buffer_size = 4096;
        mtu              = 1500;
}

/////////////////////////////////////////////////////////////////////////////
socket_connection::socket_connection(socket_connection_factory* f) : 
factory(f), transport(tcp_transport), status(connection_closed),
                socket_fd(0), read_watcher(NULL), event_loop(NULL), 
read_buffer_size(0), read_bytes(0), read_buffer(NULL), message(NULL)
{
}

socket_connection::~socket_connection()
{
}

void socket_connection::close(bool force)
{
        if (message != NULL && (unsigned char*)message != read_buffer)
                freemsg(message);
        message = NULL;

        if (read_buffer)
                free(read_buffer);
        read_buffer = NULL;
        read_buffer_size = read_bytes = 0;

        status = connection_closed;
}

tcp_connection::tcp_connection(socket_connection_factory* f, int sfd, const 
struct sockaddr_in& addr) : socket_connection(f),
                write_notify(NULL), write_watcher(NULL), write_buffer_count(0), 
write_buffer_used(0), write_buffers(NULL)
{
        transport = tcp_transport;
        socket_fd = sfd;
        memcpy(&client_addr, &addr, sizeof(struct sockaddr_in));
        status = connection_opened;
        pthread_mutex_init(&write_mutex, NULL);
}

tcp_connection::~tcp_connection()
{
        pthread_mutex_destroy(&write_mutex);
}

msg_read_result tcp_connection::read_message()
{
        if (message == NULL) {
                // read message header
                ssize_t rbs = read(socket_fd, read_buffer+read_bytes, 
MSG_HEADER_LEN-read_bytes);
                if (rbs == 0)
                        return msg_read_error;
                if (rbs == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
                        PLOG_ERROR("Can't read tcp message header");
                        return msg_read_error;
                }
                read_bytes += rbs;
                if (read_bytes < MSG_HEADER_LEN)
                        return msg_read_partial; // not read whole header

                // the header is read, check if it is necessary to allocate 
space for 'big' message
                struct msg_header* header = (struct msg_header*)read_buffer;
                if (!verify_msg_header(header)) {
                        LOG_ERROR("Invalid tcp message 
header("<<header->magic<<","<<header->type<<","<<header->msgid<<","<<header->length<<")");
                        return msg_read_error;
                }
                message = (struct csf_message*)read_buffer;
                if ((read_buffer_size-MSG_HEADER_LEN) < header->length) {
                        // not enough space to hold 'big' message
                        message = allocmsg(0, header->length);
                        if (message == NULL) {
                                LOG_ERROR("Can't allocate memory for large 
message("<<header->length<<" bytes).");
                                return msg_read_error;
                        }
                        memcpy(message, read_buffer, MSG_HEADER_LEN);
                }
        }
        // read message body
        struct msg_header* header = (struct msg_header*)read_buffer;
        size_t rlen = header->length - (read_bytes - MSG_HEADER_LEN);
        ssize_t rbs = read(socket_fd, ((unsigned char*)message)+read_bytes, 
rlen);
        if (rbs == 0)
                return msg_read_error;
        if (rbs == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
                PLOG_ERROR("Can't read tcp message content");
                return msg_read_error;
        }
        read_bytes += rbs;
        return rbs == ssize_t(rlen) ? msg_read_complete : msg_read_partial;
}

bool tcp_connection::write_message(const struct csf_message* msg)
{
    // because it is possible to write message to this connection from other 
thread
        pthread_mutex_lock(&write_mutex);
        if (status == connection_closing || status == connection_closed) {
                pthread_mutex_unlock(&write_mutex);
                return false;
        }
        // check write buffer list
        if (write_buffer_count == 0) {
                // it is the first time to write message, init the buffer list
                write_buffers = (struct iovec*)malloc(sizeof(struct iovec));
                if (write_buffers==NULL) {
                        pthread_mutex_unlock(&write_mutex);
                        LOG_ERROR("Can't allocate memory for 1 write buffers.");
                        return false;
                }
                bzero(write_buffers, sizeof(struct iovec));
                write_buffer_count = 1;
                write_buffer_used  = 0;
        } else if (write_buffer_used == write_buffer_count) {
                // the buffer list is full
                struct iovec* new_buffers = (struct 
iovec*)realloc(write_buffers, sizeof(struct iovec)*write_buffer_count*2);
                if (new_buffers == NULL) {
                        pthread_mutex_unlock(&write_mutex);
                        LOG_ERROR("Can't allocate memory for 
"<<write_buffer_count*2<<" write buffers.");
                        return false;
                }
                bzero(new_buffers+write_buffer_count, sizeof(struct 
iovec)*write_buffer_count);
                write_buffers = new_buffers;
                write_buffer_count *= 2;
        }

        // save the message
        struct iovec& msg_iovec = write_buffers[write_buffer_used];
        msg_iovec.iov_base = (void*)msg;
        msg_iovec.iov_len  = MSG_HEADER_LEN + msg->header.length;
        write_buffer_used++;

        // start up EV_WRITE event
        pthread_t self = pthread_self();
        if (ev_userdata(event_loop) == (void*)self) {
                if (!ev_is_active(write_watcher))
                        ev_io_start(event_loop, write_watcher);
        } else
                ev_async_send(event_loop, write_notify);
        pthread_mutex_unlock(&write_mutex);

        return true;
}

bool tcp_connection::reply_success(const char* info)
{
        if (status == connection_closing || status == connection_closed)
                return false;
        size_t body_len = sizeof(msg_result) + (info==NULL ? 0 : strlen(info));
        csf_message* msg = allocmsg(MSG_RESULT, body_len);
        msg->result.errorno = ERR_SUCCESS;
        if (info != NULL)
                strcpy(msg->result.sval, info);
        bool retn = write_message(msg);
        if (!retn) freemsg(msg);
        return retn;
}

bool tcp_connection::reply_success(int32_t val)
{
        if (status == connection_closing || status == connection_closed)
                return false;
        csf_message* msg = allocmsg(MSG_RESULT, sizeof(msg_result));
        msg->result.ival = val;
        bool retn = write_message(msg);
        if (!retn) freemsg(msg);
        return retn;
}

bool tcp_connection::reply_success(double val)
{
        if (status == connection_closing || status == connection_closed)
                return false;
        csf_message* msg = allocmsg(MSG_RESULT, sizeof(msg_result));
        msg->result.dval = val;
        bool retn = write_message(msg);
        if (!retn) freemsg(msg);
        return retn;
}

bool tcp_connection::reply_error(uint16_t code, const char* info)
{
        if (status == connection_closing || status == connection_closed)
                return false;
        size_t body_len = sizeof(msg_result) + (info==NULL ? 0 : strlen(info));
        csf_message* msg = allocmsg(MSG_RESULT, body_len);
        msg->result.errorno = code;
        if (info != NULL)
                strcpy(msg->result.sval, info);
        DLOG_ERROR("Error Code: "<<code<<"; Error Message: 
"<<(info==NULL?"":info));
        bool retn = write_message(msg);
        if (!retn) freemsg(msg);
        return retn;
}

void tcp_connection::close(bool force)
{
        pthread_mutex_lock(&write_mutex);

        if (force == false && write_buffer_used != 0) {
                status = connection_closing;
                pthread_mutex_unlock(&write_mutex);
                return;
        }

        if (event_loop && read_watcher) {
                ev_io_stop(event_loop, read_watcher);
                free(read_watcher);
                read_watcher = NULL;
        }
        if (event_loop && write_watcher) {
                ev_io_stop(event_loop, write_watcher);
                free(write_watcher);
                write_watcher = NULL;
        }
        event_loop = NULL;

        if (socket_fd != 0)
                ::close(socket_fd);
        socket_fd = 0;

        for(int i=0; i<write_buffer_used; ++i)
                freemsg((struct csf_message*)(write_buffers[i].iov_base));
        free(write_buffers);
        write_buffers = NULL;
        write_buffer_count = write_buffer_used = 0;

        socket_connection::close(force);

        pthread_mutex_unlock(&write_mutex);
}

udp_connection::udp_connection(socket_connection_factory* f, int sfd) : 
socket_connection(f)
{
        transport = udp_transport;
        socket_fd = sfd;
        bzero(&client_addr, sizeof(struct sockaddr_in));
        status = connection_opened;
}

udp_connection::~udp_connection()
{
}

msg_read_result udp_connection::read_message()
{
        socklen_t addr_size = sizeof(client_addr);
        int rbs = recvfrom(socket_fd, read_buffer, read_buffer_size, 0, 
(sockaddr*)&client_addr, &addr_size);
        if (rbs == 0) {
                PLOG_ERROR("Can't read udp message");
                return msg_read_error;
        }
        if (rbs == -1) {
                if (errno != EAGAIN && errno != EWOULDBLOCK) {
                        PLOG_ERROR("Can't read udp message");
                        return msg_read_error;
                }
                return msg_read_partial;
        }
        read_bytes = rbs;
        struct msg_header* header = (struct msg_header*)read_buffer;
        if (!verify_msg_header(header) || header->length != 
(read_bytes-MSG_HEADER_LEN)) {
                LOG_ERROR("Invalid udp message: "<<read_bytes<<" bytes, 
header("<<header->magic<<","<<header->type<<","<<header->msgid<<","<<header->length<<")");
                return msg_read_error;
        }
        message = (struct csf_message*)read_buffer;
        return msg_read_complete;
}

void udp_connection::close(bool force)
{
        socket_connection::close(force);
        ev_break(event_loop, EVBREAK_ALL);
}

socket_connection_factory::~socket_connection_factory()
{
}

/////////////////////////////////////////////////////////////////////////////
static void* event_loop_thread(void *arg);

socket_server::socket_server() : sockets(NULL)
{
}

socket_server::~socket_server()
{
}

void socket_server::cleanup()
{
        server_socket* sock = NULL;
        while( sockets ) {
                sock = sockets->next;
                delete sockets;
                sockets = sock;
        }
}

bool socket_server::create_server_socket(const char* name, const 
socket_settings& settings, socket_connection_factory* factory)
{
        std::auto_ptr<server_socket> sock(new server_socket(name, factory));
        sock->event_loop = ev_loop_new(EVFLAG_AUTO);

        if (!sock->create(settings))
                return false;
        ev_set_userdata(sock->event_loop, sock.get());

        if (sock->is_tcp() && !sock->create_worker_threads())
                return false;

        pthread_attr_t attr;
        pthread_attr_init(&attr);
        if (int ret = pthread_create(&(sock->dispatch_thread), &attr, 
event_loop_thread, sock->event_loop) != 0) {
                LOG_ERROR("Can't create worker thread: "<<strerror(ret));
                return false;
        }
        
        sock->next = sockets;
        sockets = sock.release();
        return true;
}

void socket_server::wait_for_shutdown()
{
        server_socket* svrsocket = sockets;
        while( svrsocket ) {
                pthread_join(svrsocket->dispatch_thread, NULL);
                svrsocket = svrsocket->next;
        }
}

/////////////////////////////////////////////////////////////////////////////
static void socket_connection_arrived(struct ev_loop *loop, struct ev_io 
*watcher, int revents);
static void dispatched_socket_arrived(struct ev_loop* loop, struct ev_io* 
watcher, int revents);
static void socket_data_arrived(struct ev_loop *loop, struct ev_io *watcher, 
int revents);
static void socket_data_writable(struct ev_loop *loop, struct ev_io *watcher, 
int revents);
static void start_socket_write(struct ev_loop* loop, struct ev_async* w, int 
revents);

server_socket::server_socket(const char* name, socket_connection_factory* f) : 
socket_name(strdup(name)), factory(f), listeners(NULL),
        worker_threads(NULL), dispatch_thread(0), event_loop(NULL), 
last_dispatched(-1), next(NULL)
{
}

server_socket::~server_socket()
{
        cleanup();
}

bool server_socket::create(const socket_settings& settings)
{
        memcpy(&(this->settings), &settings, sizeof(socket_settings));

    addrinfo hints;
    bzero(&hints, sizeof(addrinfo));
    hints.ai_flags = AI_PASSIVE;
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = is_tcp() ? SOCK_STREAM : SOCK_DGRAM;

    char port_buf[NI_MAXSERV] = {0};
    snprintf(port_buf, sizeof(port_buf), "%d", settings.port);

        addrinfo* ai = NULL;
    int error = getaddrinfo(settings.inter, port_buf, &hints, &ai);
    if (error != 0) {
        LOG_ERROR("getaddrinfo(): "<<(error==EAI_SYSTEM ? strerror(errno) : 
gai_strerror(error)));
        return false;
    }

        int sfd = -1;
    for (addrinfo* next = ai; next; next= next->ai_next) {
                if ((sfd = socket(next->ai_family, next->ai_socktype, 
next->ai_protocol)) == -1)
                        continue;
                int flags;
                if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, 
flags | O_NONBLOCK) < 0) {
                        PLOG_ERROR("Setting O_NONBLOCK on socket");
                        close(sfd);
                        continue;
                }

        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, 
sizeof(flags));
                if (is_tcp()) {
                        error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void 
*)&flags, sizeof(flags));
                        if (error != 0)
                                PLOG_ERROR("setsockopt()");

                        linger ling = {0, 0};
                        error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void 
*)&ling, sizeof(ling));
                        if (error != 0)
                                PLOG_ERROR("setsockopt()");

                        error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void 
*)&flags, sizeof(flags));
                        if (error != 0)
                                PLOG_ERROR("setsockopt()");
                }

        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
            if (errno != EADDRINUSE) {
                PLOG_ERROR("bind()");
                goto error;
            }
            close(sfd);
            continue;
        } else {
                struct socket_listener* listener = (struct 
socket_listener*)malloc(sizeof(struct socket_listener));
                listener->socket_fd = sfd;
                listener->svrsocket = this;
            if (is_tcp()) {
                listener->watcher.data = listener;
                                if (listen(sfd, settings.backlog) == -1) {
                                        PLOG_ERROR("listen()");
                                        goto error;
                                }
                        } else {
                                udp_connection* conn = 
(udp_connection*)factory->create_connection(udp_transport, socket_name, sfd, 
NULL);
                                if (conn == NULL) {
                                        LOG_ERROR("Fail to create udp 
connection from the factory.");
                                        goto error;
                                }
                                listener->watcher.data = conn;
                                conn->read_watcher = &listener->watcher;
                                conn->event_loop = event_loop;
                                conn->read_buffer_size = 
settings.read_buffer_size;
                                conn->read_bytes = 0;
                                conn->read_buffer = (unsigned 
char*)malloc(conn->read_buffer_size);
                                if (conn->read_buffer == NULL) {
                                        LOG_ERROR("Fail to allocate read buffer 
memory("<<conn->read_buffer_size<<" bytes) for new connection.");
                                        factory->destory_connection(conn);
                                        goto error;
                                }
                        }
                        listener->next = listeners;
                        listeners = listener;

                        ev_io_init(&(listener->watcher), 
is_tcp()?socket_connection_arrived:socket_data_arrived, listener->socket_fd, 
EV_READ);
                        ev_io_start(event_loop, &(listener->watcher));
        }
    }
    freeaddrinfo(ai);
        usleep(1000);
    return true;

error:
        close(sfd);
    freeaddrinfo(ai);
    return false;
}

bool server_socket::create_worker_threads()
{
        worker_threads = (struct socket_thread*)malloc(sizeof(struct 
socket_thread)*settings.num_threads);
        bzero(worker_threads, sizeof(socket_thread)*settings.num_threads);

        for (int i = 0; i < settings.num_threads; i++) {
                int fds[2];
                if (pipe(fds)) {
                        PLOG_ERROR("Can't create notify pipe");
                        return false;
                }
                
                socket_thread* worker = &(worker_threads[i]);
                worker->notify_receive_fd = fds[0];
                worker->notify_send_fd = fds[1];
                worker->svrsocket = this;
                worker->event_loop = ev_loop_new(EVFLAG_AUTO);
                if( !worker->event_loop ) {
                        LOG_ERROR("Can't allocate event loop for worker 
thread.");
                        return false;
                }
                worker->notify_event = (struct ev_io*)malloc(sizeof(struct 
ev_io));
                ev_io_init (worker->notify_event, dispatched_socket_arrived, 
worker->notify_receive_fd, EV_READ);
                worker->notify_event->data = worker;
                ev_io_start (worker->event_loop, worker->notify_event);

                pthread_attr_t attr;
                pthread_attr_init(&attr);
                if (int ret = pthread_create(&(worker->thread), &attr, 
event_loop_thread, worker->event_loop) != 0) {
                        LOG_ERROR("Can't create worker thread: 
"<<strerror(ret));
                        return false;
                }
        }
        return true;
}

bool server_socket::dispatch_connection(const socket_connection_info& info)
{
        int tid = (last_dispatched + 1) % settings.num_threads;
        last_dispatched = tid;

    socket_thread* thread = worker_threads + tid;
    size_t len = sizeof(struct socket_connection_info);
    if (write(thread->notify_send_fd, &info, len) != int(len)) {
        PLOG_ERROR("Fail to writing to connection notify pipe");
                return false;
        }
        return true;
}

void server_socket::cleanup()
{       
        free((void*)socket_name);
        socket_name = NULL;

        socket_listener* sl = NULL;
        while( listeners ) {
                ev_io_stop(event_loop, &(listeners->watcher));
                close(listeners->socket_fd);
                sl = listeners->next;
                free(listeners);
                listeners = sl;
        }
        
        if (worker_threads) {
                for(int i=0; i<settings.num_threads; ++i)
                {
                        socket_thread* thread = &worker_threads[i];
                
                        ev_io_stop(thread->event_loop, thread->notify_event);
                        free(thread->notify_event);
                        ev_loop_destroy(thread->event_loop);
                
                        close(thread->notify_receive_fd);
                        close(thread->notify_send_fd);
                }
                free(worker_threads);
                worker_threads = NULL;
        }
        
        last_dispatched = -1;
        
        ev_loop_destroy(event_loop);
        event_loop = NULL;
}

////////////////////////////////////////////////////////////////////////
static void* event_loop_thread(void *arg)
{
        struct ev_loop* event_loop = (struct ev_loop*)arg;
        if (ev_userdata(event_loop) == NULL) {
                pthread_t self = pthread_self();
                ev_set_userdata(event_loop, (void*)self);
        }
    ev_run (event_loop, 0);
    return NULL;
}

static void socket_connection_arrived(struct ev_loop *loop, struct ev_io 
*watcher, int revents)
{
    socket_listener* listener = (socket_listener*)watcher->data;

        struct socket_connection_info info;
        socklen_t addrlen = sizeof(info.addr);
        if ((info.socket_fd = accept(listener->socket_fd, 
(sockaddr*)&info.addr, &addrlen)) == -1) {
                if (errno != EAGAIN && errno != EWOULDBLOCK)
                        PLOG_ERROR("Accept socket");
                return;
        }
        int flags = 1;
        if ((flags = fcntl(info.socket_fd, F_GETFL, 0)) < 0 || 
fcntl(info.socket_fd, F_SETFL, flags | O_NONBLOCK) < 0) {
                PLOG_ERROR("Setting O_NONBLOCK on accepted socket");
                close(info.socket_fd);
                return;
        }
        if (!listener->svrsocket->dispatch_connection(info)) {
                close(info.socket_fd);
                return;
        }
}

static void dispatched_socket_arrived(struct ev_loop* loop, struct ev_io* 
watcher, int revents)
{
    socket_thread* thread = (socket_thread*)watcher->data;

    struct socket_connection_info info;
    size_t len = sizeof(struct socket_connection_info);
    if (read(watcher->fd, &info, len) != int(len)) {
        PLOG_ERROR("Can't read from connection notify pipe");
        return;
    }

        socket_connection_factory* factory = thread->svrsocket->factory;
        tcp_connection* conn = 
(tcp_connection*)factory->create_connection(tcp_transport, 
thread->svrsocket->socket_name, info.socket_fd, &info.addr);
        if (conn == NULL) {
                LOG_ERROR("Fail to create tcp connection from the factory.");
                return;
        }
        conn->read_watcher = (struct ev_io*)malloc(sizeof(struct ev_io));
        conn->read_watcher->data = conn;
        conn->write_notify  = (struct ev_async*)malloc(sizeof(struct ev_async));
        conn->write_notify->data = conn;
        conn->write_watcher = (struct ev_io*)malloc(sizeof(struct ev_io));
        conn->write_watcher->data = conn;
        conn->read_buffer_size = thread->svrsocket->settings.read_buffer_size;
        conn->read_bytes = 0;
        conn->read_buffer = (unsigned char*)malloc(conn->read_buffer_size);
        if (conn->read_buffer == NULL) {
                LOG_ERROR("Fail to allocate read buffer 
memory("<<conn->read_buffer_size<<" bytes) for new connection.");
                factory->destory_connection(conn);
                return;
        }
        conn->event_loop = thread->event_loop;

        ev_io_init(conn->read_watcher, socket_data_arrived, info.socket_fd, 
EV_READ);
        ev_io_init(conn->write_watcher, socket_data_writable, info.socket_fd, 
EV_WRITE);
        ev_io_start(thread->event_loop, conn->read_watcher);
        ev_async_init(conn->write_notify, start_socket_write);
        ev_async_start(thread->event_loop, conn->write_notify);
}

static void reset_connection_read(socket_connection* conn)
{
        conn->read_bytes = 0;
        if ((unsigned char*)(conn->message) != conn->read_buffer)
                freemsg(conn->message);
        conn->message = NULL;
}

static void socket_data_arrived(struct ev_loop *loop, struct ev_io *watcher, 
int revents)
{
    socket_connection* conn = (socket_connection*)watcher->data;
        switch (conn->read_message()) {
        case msg_read_complete:
                conn->process_message(conn->message);
                reset_connection_read(conn);
                break;
        case msg_read_partial:
                return;
        case msg_read_error:
        default: {
                conn->close(true);
                conn->factory->destory_connection(conn);
        }
        break;
        }
}

static void start_socket_write(struct ev_loop* loop, struct ev_async* w, int 
revents)
{
        tcp_connection* conn = (tcp_connection*)w->data;
        if (!ev_is_active(conn->write_watcher))
                ev_io_start(conn->event_loop, conn->write_watcher);
}

static void socket_data_writable(struct ev_loop *loop, struct ev_io *watcher, 
int revents)
{
        tcp_connection* conn = (tcp_connection*)watcher->data;
        // because it is possible to write message to this connection from 
other thread
        pthread_mutex_lock(&(conn->write_mutex));

        if (conn->write_buffer_used == 0)
                return;
        int iovecs_written = 0, remainder = 0;
        ssize_t wbs = writev(conn->socket_fd, conn->write_buffers, 
conn->write_buffer_used);
        if (wbs == 0) {
                LOG_ERROR("Can't send message to remote client: 
socket("<<conn->socket_fd<<") is closed.");
                goto destroy_connection;
        } else if (wbs == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
                PLOG_ERROR("Can't send message to remote client");
                goto destroy_connection;
        }
        for (int i=0; i<conn->write_buffer_used; ++i) {
                struct iovec& vec = conn->write_buffers[i];
                if (wbs > ssize_t(vec.iov_len)) {
                        freemsg((struct csf_message*)vec.iov_base);
                        vec.iov_base = NULL;
                        iovecs_written++;
                        wbs -= vec.iov_len;
                } else if (wbs == ssize_t(vec.iov_len)) {
                        freemsg((struct csf_message*)vec.iov_base);
                        vec.iov_base = NULL;
                        iovecs_written++;
                        break;
                } else {
                        remainder = vec.iov_len - wbs;
                        break;
                }
        }
        conn->write_buffer_used -= iovecs_written;
        if (conn->write_buffer_used == 0) {
                if (conn->status == connection_closing)
                        goto destroy_connection;
                else
                        ev_io_stop(conn->event_loop, conn->write_watcher);
        }
        else {
                memmove(conn->write_buffers, 
conn->write_buffers+iovecs_written, sizeof(struct 
iovec)*conn->write_buffer_used);
                bzero(conn->write_buffers+conn->write_buffer_used, 
sizeof(struct iovec)*iovecs_written);
                if (remainder > 0) {
                        unsigned char* iov_base = (unsigned 
char*)conn->write_buffers[0].iov_base;
                        memmove(iov_base, 
iov_base+(conn->write_buffers[0].iov_len-remainder), remainder);
                        conn->write_buffers[0].iov_len = remainder;
                }
        }

        pthread_mutex_unlock(&(conn->write_mutex));
        return;

destroy_connection:
        pthread_mutex_unlock(&(conn->write_mutex));
        conn->close(true);
        conn->factory->destory_connection(conn);
}

Attachment: socket_server.h
Description: Binary data

_______________________________________________
libev mailing list
libev@lists.schmorp.de
http://lists.schmorp.de/cgi-bin/mailman/listinfo/libev

Reply via email to