I am happy to see this e-mail because I can contribute some thing :-)... This is exactly the same need for me and I write some code to do this. They are in attachment files. The key to accomplish this goal is: 1. create new event loop for per worker thread, with a pipe and a extra ev_io to watch pipe's read event. This is done before the worker thread start, so the main thread(listen thread) can hold these in a struct in a list. It is important to know that pipe's read event is watched in the worker thread's event loop, not the main event loop. 2. once new connection accepted, the listen thread send the accepted socket fd on pipe, just using round robin to determine which thread struct(this contain the pipe) to use 3. the worker thread use a particular callback for the pipe read watcher. In this callback(dispatched_socket_arrived in example), create normal read/write watchers for socket fd just received on pipe. Just start the read watcher, not start the write watcher for this socket fd 4. so data send/received on this socket fd is processed on worker thread. I hope this can help you.
在2013年01月31 22时31分,"Chris Herssens"<chris.herss...@gmail.com> 写道: I want to use libev with multiple threads for the handling of tcp connections. What I want to is: The main thread listen on incoming connections, accept the connections and forward the connection to a workerthread. I have a pool of workerthreads. The number of threads depends on the number of cpu's. Each worker-thread has an event loop. The worker-thread listen if I can write on the tcp socket or if somethings available for reading. I looked into the documentation of libev and I known this can be done with libev, but I can't find any example how I have to do that. Does someone has an example? I think that I have to use the ev_loop_new() api, for the worker-threads and for the main thread I have to use the ev_default_loop() ? Since I new in using libev I really need an example how to dispatch an incoming tcp connection to a worker-thread. I have started to implement it, but I don't get it work. In the main thread I start the default loop. And if I received new connection I create a watcher and use the ev_io_start with the loop that I create in the workerthread. It seems that the callback function of the watcher is not always triggered. Also Is there a way to block the ev_loop if there no active watchers ? regards.
#include <signal.h> #include <stdarg.h> #include <memory> #include <cstring> #include <set> #include <boost/scoped_array.hpp> #include <csf/logging.h> #include "socket_server.h" using namespace csf; // the worker thread holding a event loop struct socket_thread { pthread_t thread; struct ev_loop* event_loop; struct ev_io* notify_event; int notify_receive_fd; int notify_send_fd; server_socket* svrsocket; }; struct socket_listener { int socket_fd; struct ev_io watcher; server_socket* svrsocket; socket_listener* next; }; // the data object used to take socket fd from accept thread to worker thread struct socket_connection_info { int socket_fd; struct sockaddr_in addr; }; // a server socket accepting client connection struct server_socket { server_socket(const char* name, socket_connection_factory* f); ~server_socket(); bool create(const socket_settings& settings); bool create_worker_threads(); bool dispatch_connection(const socket_connection_info& info); void cleanup(); bool is_tcp() { return settings.transport == tcp_transport; }; const char* socket_name; socket_settings settings; socket_connection_factory* factory; socket_listener* listeners; socket_thread* worker_threads; pthread_t dispatch_thread; struct ev_loop* event_loop; int last_dispatched; server_socket* next; }; ///////////////////////////////////////////////////////////////////////////// socket_settings::socket_settings() { strcpy(inter, "127.0.0.1"); port = 0; transport = tcp_transport; max_conns = 1024; num_threads = 8; reqs_per_event = 20; backlog = 1024; read_buffer_size = 4096; mtu = 1500; } ///////////////////////////////////////////////////////////////////////////// socket_connection::socket_connection(socket_connection_factory* f) : factory(f), transport(tcp_transport), status(connection_closed), socket_fd(0), read_watcher(NULL), event_loop(NULL), read_buffer_size(0), read_bytes(0), read_buffer(NULL), message(NULL) { } socket_connection::~socket_connection() { } void socket_connection::close(bool force) { if (message != NULL && (unsigned char*)message != read_buffer) freemsg(message); message = NULL; if (read_buffer) free(read_buffer); read_buffer = NULL; read_buffer_size = read_bytes = 0; status = connection_closed; } tcp_connection::tcp_connection(socket_connection_factory* f, int sfd, const struct sockaddr_in& addr) : socket_connection(f), write_notify(NULL), write_watcher(NULL), write_buffer_count(0), write_buffer_used(0), write_buffers(NULL) { transport = tcp_transport; socket_fd = sfd; memcpy(&client_addr, &addr, sizeof(struct sockaddr_in)); status = connection_opened; pthread_mutex_init(&write_mutex, NULL); } tcp_connection::~tcp_connection() { pthread_mutex_destroy(&write_mutex); } msg_read_result tcp_connection::read_message() { if (message == NULL) { // read message header ssize_t rbs = read(socket_fd, read_buffer+read_bytes, MSG_HEADER_LEN-read_bytes); if (rbs == 0) return msg_read_error; if (rbs == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { PLOG_ERROR("Can't read tcp message header"); return msg_read_error; } read_bytes += rbs; if (read_bytes < MSG_HEADER_LEN) return msg_read_partial; // not read whole header // the header is read, check if it is necessary to allocate space for 'big' message struct msg_header* header = (struct msg_header*)read_buffer; if (!verify_msg_header(header)) { LOG_ERROR("Invalid tcp message header("<<header->magic<<","<<header->type<<","<<header->msgid<<","<<header->length<<")"); return msg_read_error; } message = (struct csf_message*)read_buffer; if ((read_buffer_size-MSG_HEADER_LEN) < header->length) { // not enough space to hold 'big' message message = allocmsg(0, header->length); if (message == NULL) { LOG_ERROR("Can't allocate memory for large message("<<header->length<<" bytes)."); return msg_read_error; } memcpy(message, read_buffer, MSG_HEADER_LEN); } } // read message body struct msg_header* header = (struct msg_header*)read_buffer; size_t rlen = header->length - (read_bytes - MSG_HEADER_LEN); ssize_t rbs = read(socket_fd, ((unsigned char*)message)+read_bytes, rlen); if (rbs == 0) return msg_read_error; if (rbs == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { PLOG_ERROR("Can't read tcp message content"); return msg_read_error; } read_bytes += rbs; return rbs == ssize_t(rlen) ? msg_read_complete : msg_read_partial; } bool tcp_connection::write_message(const struct csf_message* msg) { // because it is possible to write message to this connection from other thread pthread_mutex_lock(&write_mutex); if (status == connection_closing || status == connection_closed) { pthread_mutex_unlock(&write_mutex); return false; } // check write buffer list if (write_buffer_count == 0) { // it is the first time to write message, init the buffer list write_buffers = (struct iovec*)malloc(sizeof(struct iovec)); if (write_buffers==NULL) { pthread_mutex_unlock(&write_mutex); LOG_ERROR("Can't allocate memory for 1 write buffers."); return false; } bzero(write_buffers, sizeof(struct iovec)); write_buffer_count = 1; write_buffer_used = 0; } else if (write_buffer_used == write_buffer_count) { // the buffer list is full struct iovec* new_buffers = (struct iovec*)realloc(write_buffers, sizeof(struct iovec)*write_buffer_count*2); if (new_buffers == NULL) { pthread_mutex_unlock(&write_mutex); LOG_ERROR("Can't allocate memory for "<<write_buffer_count*2<<" write buffers."); return false; } bzero(new_buffers+write_buffer_count, sizeof(struct iovec)*write_buffer_count); write_buffers = new_buffers; write_buffer_count *= 2; } // save the message struct iovec& msg_iovec = write_buffers[write_buffer_used]; msg_iovec.iov_base = (void*)msg; msg_iovec.iov_len = MSG_HEADER_LEN + msg->header.length; write_buffer_used++; // start up EV_WRITE event pthread_t self = pthread_self(); if (ev_userdata(event_loop) == (void*)self) { if (!ev_is_active(write_watcher)) ev_io_start(event_loop, write_watcher); } else ev_async_send(event_loop, write_notify); pthread_mutex_unlock(&write_mutex); return true; } bool tcp_connection::reply_success(const char* info) { if (status == connection_closing || status == connection_closed) return false; size_t body_len = sizeof(msg_result) + (info==NULL ? 0 : strlen(info)); csf_message* msg = allocmsg(MSG_RESULT, body_len); msg->result.errorno = ERR_SUCCESS; if (info != NULL) strcpy(msg->result.sval, info); bool retn = write_message(msg); if (!retn) freemsg(msg); return retn; } bool tcp_connection::reply_success(int32_t val) { if (status == connection_closing || status == connection_closed) return false; csf_message* msg = allocmsg(MSG_RESULT, sizeof(msg_result)); msg->result.ival = val; bool retn = write_message(msg); if (!retn) freemsg(msg); return retn; } bool tcp_connection::reply_success(double val) { if (status == connection_closing || status == connection_closed) return false; csf_message* msg = allocmsg(MSG_RESULT, sizeof(msg_result)); msg->result.dval = val; bool retn = write_message(msg); if (!retn) freemsg(msg); return retn; } bool tcp_connection::reply_error(uint16_t code, const char* info) { if (status == connection_closing || status == connection_closed) return false; size_t body_len = sizeof(msg_result) + (info==NULL ? 0 : strlen(info)); csf_message* msg = allocmsg(MSG_RESULT, body_len); msg->result.errorno = code; if (info != NULL) strcpy(msg->result.sval, info); DLOG_ERROR("Error Code: "<<code<<"; Error Message: "<<(info==NULL?"":info)); bool retn = write_message(msg); if (!retn) freemsg(msg); return retn; } void tcp_connection::close(bool force) { pthread_mutex_lock(&write_mutex); if (force == false && write_buffer_used != 0) { status = connection_closing; pthread_mutex_unlock(&write_mutex); return; } if (event_loop && read_watcher) { ev_io_stop(event_loop, read_watcher); free(read_watcher); read_watcher = NULL; } if (event_loop && write_watcher) { ev_io_stop(event_loop, write_watcher); free(write_watcher); write_watcher = NULL; } event_loop = NULL; if (socket_fd != 0) ::close(socket_fd); socket_fd = 0; for(int i=0; i<write_buffer_used; ++i) freemsg((struct csf_message*)(write_buffers[i].iov_base)); free(write_buffers); write_buffers = NULL; write_buffer_count = write_buffer_used = 0; socket_connection::close(force); pthread_mutex_unlock(&write_mutex); } udp_connection::udp_connection(socket_connection_factory* f, int sfd) : socket_connection(f) { transport = udp_transport; socket_fd = sfd; bzero(&client_addr, sizeof(struct sockaddr_in)); status = connection_opened; } udp_connection::~udp_connection() { } msg_read_result udp_connection::read_message() { socklen_t addr_size = sizeof(client_addr); int rbs = recvfrom(socket_fd, read_buffer, read_buffer_size, 0, (sockaddr*)&client_addr, &addr_size); if (rbs == 0) { PLOG_ERROR("Can't read udp message"); return msg_read_error; } if (rbs == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { PLOG_ERROR("Can't read udp message"); return msg_read_error; } return msg_read_partial; } read_bytes = rbs; struct msg_header* header = (struct msg_header*)read_buffer; if (!verify_msg_header(header) || header->length != (read_bytes-MSG_HEADER_LEN)) { LOG_ERROR("Invalid udp message: "<<read_bytes<<" bytes, header("<<header->magic<<","<<header->type<<","<<header->msgid<<","<<header->length<<")"); return msg_read_error; } message = (struct csf_message*)read_buffer; return msg_read_complete; } void udp_connection::close(bool force) { socket_connection::close(force); ev_break(event_loop, EVBREAK_ALL); } socket_connection_factory::~socket_connection_factory() { } ///////////////////////////////////////////////////////////////////////////// static void* event_loop_thread(void *arg); socket_server::socket_server() : sockets(NULL) { } socket_server::~socket_server() { } void socket_server::cleanup() { server_socket* sock = NULL; while( sockets ) { sock = sockets->next; delete sockets; sockets = sock; } } bool socket_server::create_server_socket(const char* name, const socket_settings& settings, socket_connection_factory* factory) { std::auto_ptr<server_socket> sock(new server_socket(name, factory)); sock->event_loop = ev_loop_new(EVFLAG_AUTO); if (!sock->create(settings)) return false; ev_set_userdata(sock->event_loop, sock.get()); if (sock->is_tcp() && !sock->create_worker_threads()) return false; pthread_attr_t attr; pthread_attr_init(&attr); if (int ret = pthread_create(&(sock->dispatch_thread), &attr, event_loop_thread, sock->event_loop) != 0) { LOG_ERROR("Can't create worker thread: "<<strerror(ret)); return false; } sock->next = sockets; sockets = sock.release(); return true; } void socket_server::wait_for_shutdown() { server_socket* svrsocket = sockets; while( svrsocket ) { pthread_join(svrsocket->dispatch_thread, NULL); svrsocket = svrsocket->next; } } ///////////////////////////////////////////////////////////////////////////// static void socket_connection_arrived(struct ev_loop *loop, struct ev_io *watcher, int revents); static void dispatched_socket_arrived(struct ev_loop* loop, struct ev_io* watcher, int revents); static void socket_data_arrived(struct ev_loop *loop, struct ev_io *watcher, int revents); static void socket_data_writable(struct ev_loop *loop, struct ev_io *watcher, int revents); static void start_socket_write(struct ev_loop* loop, struct ev_async* w, int revents); server_socket::server_socket(const char* name, socket_connection_factory* f) : socket_name(strdup(name)), factory(f), listeners(NULL), worker_threads(NULL), dispatch_thread(0), event_loop(NULL), last_dispatched(-1), next(NULL) { } server_socket::~server_socket() { cleanup(); } bool server_socket::create(const socket_settings& settings) { memcpy(&(this->settings), &settings, sizeof(socket_settings)); addrinfo hints; bzero(&hints, sizeof(addrinfo)); hints.ai_flags = AI_PASSIVE; hints.ai_family = AF_UNSPEC; hints.ai_socktype = is_tcp() ? SOCK_STREAM : SOCK_DGRAM; char port_buf[NI_MAXSERV] = {0}; snprintf(port_buf, sizeof(port_buf), "%d", settings.port); addrinfo* ai = NULL; int error = getaddrinfo(settings.inter, port_buf, &hints, &ai); if (error != 0) { LOG_ERROR("getaddrinfo(): "<<(error==EAI_SYSTEM ? strerror(errno) : gai_strerror(error))); return false; } int sfd = -1; for (addrinfo* next = ai; next; next= next->ai_next) { if ((sfd = socket(next->ai_family, next->ai_socktype, next->ai_protocol)) == -1) continue; int flags; if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { PLOG_ERROR("Setting O_NONBLOCK on socket"); close(sfd); continue; } setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); if (is_tcp()) { error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); if (error != 0) PLOG_ERROR("setsockopt()"); linger ling = {0, 0}; error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); if (error != 0) PLOG_ERROR("setsockopt()"); error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); if (error != 0) PLOG_ERROR("setsockopt()"); } if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) { if (errno != EADDRINUSE) { PLOG_ERROR("bind()"); goto error; } close(sfd); continue; } else { struct socket_listener* listener = (struct socket_listener*)malloc(sizeof(struct socket_listener)); listener->socket_fd = sfd; listener->svrsocket = this; if (is_tcp()) { listener->watcher.data = listener; if (listen(sfd, settings.backlog) == -1) { PLOG_ERROR("listen()"); goto error; } } else { udp_connection* conn = (udp_connection*)factory->create_connection(udp_transport, socket_name, sfd, NULL); if (conn == NULL) { LOG_ERROR("Fail to create udp connection from the factory."); goto error; } listener->watcher.data = conn; conn->read_watcher = &listener->watcher; conn->event_loop = event_loop; conn->read_buffer_size = settings.read_buffer_size; conn->read_bytes = 0; conn->read_buffer = (unsigned char*)malloc(conn->read_buffer_size); if (conn->read_buffer == NULL) { LOG_ERROR("Fail to allocate read buffer memory("<<conn->read_buffer_size<<" bytes) for new connection."); factory->destory_connection(conn); goto error; } } listener->next = listeners; listeners = listener; ev_io_init(&(listener->watcher), is_tcp()?socket_connection_arrived:socket_data_arrived, listener->socket_fd, EV_READ); ev_io_start(event_loop, &(listener->watcher)); } } freeaddrinfo(ai); usleep(1000); return true; error: close(sfd); freeaddrinfo(ai); return false; } bool server_socket::create_worker_threads() { worker_threads = (struct socket_thread*)malloc(sizeof(struct socket_thread)*settings.num_threads); bzero(worker_threads, sizeof(socket_thread)*settings.num_threads); for (int i = 0; i < settings.num_threads; i++) { int fds[2]; if (pipe(fds)) { PLOG_ERROR("Can't create notify pipe"); return false; } socket_thread* worker = &(worker_threads[i]); worker->notify_receive_fd = fds[0]; worker->notify_send_fd = fds[1]; worker->svrsocket = this; worker->event_loop = ev_loop_new(EVFLAG_AUTO); if( !worker->event_loop ) { LOG_ERROR("Can't allocate event loop for worker thread."); return false; } worker->notify_event = (struct ev_io*)malloc(sizeof(struct ev_io)); ev_io_init (worker->notify_event, dispatched_socket_arrived, worker->notify_receive_fd, EV_READ); worker->notify_event->data = worker; ev_io_start (worker->event_loop, worker->notify_event); pthread_attr_t attr; pthread_attr_init(&attr); if (int ret = pthread_create(&(worker->thread), &attr, event_loop_thread, worker->event_loop) != 0) { LOG_ERROR("Can't create worker thread: "<<strerror(ret)); return false; } } return true; } bool server_socket::dispatch_connection(const socket_connection_info& info) { int tid = (last_dispatched + 1) % settings.num_threads; last_dispatched = tid; socket_thread* thread = worker_threads + tid; size_t len = sizeof(struct socket_connection_info); if (write(thread->notify_send_fd, &info, len) != int(len)) { PLOG_ERROR("Fail to writing to connection notify pipe"); return false; } return true; } void server_socket::cleanup() { free((void*)socket_name); socket_name = NULL; socket_listener* sl = NULL; while( listeners ) { ev_io_stop(event_loop, &(listeners->watcher)); close(listeners->socket_fd); sl = listeners->next; free(listeners); listeners = sl; } if (worker_threads) { for(int i=0; i<settings.num_threads; ++i) { socket_thread* thread = &worker_threads[i]; ev_io_stop(thread->event_loop, thread->notify_event); free(thread->notify_event); ev_loop_destroy(thread->event_loop); close(thread->notify_receive_fd); close(thread->notify_send_fd); } free(worker_threads); worker_threads = NULL; } last_dispatched = -1; ev_loop_destroy(event_loop); event_loop = NULL; } //////////////////////////////////////////////////////////////////////// static void* event_loop_thread(void *arg) { struct ev_loop* event_loop = (struct ev_loop*)arg; if (ev_userdata(event_loop) == NULL) { pthread_t self = pthread_self(); ev_set_userdata(event_loop, (void*)self); } ev_run (event_loop, 0); return NULL; } static void socket_connection_arrived(struct ev_loop *loop, struct ev_io *watcher, int revents) { socket_listener* listener = (socket_listener*)watcher->data; struct socket_connection_info info; socklen_t addrlen = sizeof(info.addr); if ((info.socket_fd = accept(listener->socket_fd, (sockaddr*)&info.addr, &addrlen)) == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) PLOG_ERROR("Accept socket"); return; } int flags = 1; if ((flags = fcntl(info.socket_fd, F_GETFL, 0)) < 0 || fcntl(info.socket_fd, F_SETFL, flags | O_NONBLOCK) < 0) { PLOG_ERROR("Setting O_NONBLOCK on accepted socket"); close(info.socket_fd); return; } if (!listener->svrsocket->dispatch_connection(info)) { close(info.socket_fd); return; } } static void dispatched_socket_arrived(struct ev_loop* loop, struct ev_io* watcher, int revents) { socket_thread* thread = (socket_thread*)watcher->data; struct socket_connection_info info; size_t len = sizeof(struct socket_connection_info); if (read(watcher->fd, &info, len) != int(len)) { PLOG_ERROR("Can't read from connection notify pipe"); return; } socket_connection_factory* factory = thread->svrsocket->factory; tcp_connection* conn = (tcp_connection*)factory->create_connection(tcp_transport, thread->svrsocket->socket_name, info.socket_fd, &info.addr); if (conn == NULL) { LOG_ERROR("Fail to create tcp connection from the factory."); return; } conn->read_watcher = (struct ev_io*)malloc(sizeof(struct ev_io)); conn->read_watcher->data = conn; conn->write_notify = (struct ev_async*)malloc(sizeof(struct ev_async)); conn->write_notify->data = conn; conn->write_watcher = (struct ev_io*)malloc(sizeof(struct ev_io)); conn->write_watcher->data = conn; conn->read_buffer_size = thread->svrsocket->settings.read_buffer_size; conn->read_bytes = 0; conn->read_buffer = (unsigned char*)malloc(conn->read_buffer_size); if (conn->read_buffer == NULL) { LOG_ERROR("Fail to allocate read buffer memory("<<conn->read_buffer_size<<" bytes) for new connection."); factory->destory_connection(conn); return; } conn->event_loop = thread->event_loop; ev_io_init(conn->read_watcher, socket_data_arrived, info.socket_fd, EV_READ); ev_io_init(conn->write_watcher, socket_data_writable, info.socket_fd, EV_WRITE); ev_io_start(thread->event_loop, conn->read_watcher); ev_async_init(conn->write_notify, start_socket_write); ev_async_start(thread->event_loop, conn->write_notify); } static void reset_connection_read(socket_connection* conn) { conn->read_bytes = 0; if ((unsigned char*)(conn->message) != conn->read_buffer) freemsg(conn->message); conn->message = NULL; } static void socket_data_arrived(struct ev_loop *loop, struct ev_io *watcher, int revents) { socket_connection* conn = (socket_connection*)watcher->data; switch (conn->read_message()) { case msg_read_complete: conn->process_message(conn->message); reset_connection_read(conn); break; case msg_read_partial: return; case msg_read_error: default: { conn->close(true); conn->factory->destory_connection(conn); } break; } } static void start_socket_write(struct ev_loop* loop, struct ev_async* w, int revents) { tcp_connection* conn = (tcp_connection*)w->data; if (!ev_is_active(conn->write_watcher)) ev_io_start(conn->event_loop, conn->write_watcher); } static void socket_data_writable(struct ev_loop *loop, struct ev_io *watcher, int revents) { tcp_connection* conn = (tcp_connection*)watcher->data; // because it is possible to write message to this connection from other thread pthread_mutex_lock(&(conn->write_mutex)); if (conn->write_buffer_used == 0) return; int iovecs_written = 0, remainder = 0; ssize_t wbs = writev(conn->socket_fd, conn->write_buffers, conn->write_buffer_used); if (wbs == 0) { LOG_ERROR("Can't send message to remote client: socket("<<conn->socket_fd<<") is closed."); goto destroy_connection; } else if (wbs == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { PLOG_ERROR("Can't send message to remote client"); goto destroy_connection; } for (int i=0; i<conn->write_buffer_used; ++i) { struct iovec& vec = conn->write_buffers[i]; if (wbs > ssize_t(vec.iov_len)) { freemsg((struct csf_message*)vec.iov_base); vec.iov_base = NULL; iovecs_written++; wbs -= vec.iov_len; } else if (wbs == ssize_t(vec.iov_len)) { freemsg((struct csf_message*)vec.iov_base); vec.iov_base = NULL; iovecs_written++; break; } else { remainder = vec.iov_len - wbs; break; } } conn->write_buffer_used -= iovecs_written; if (conn->write_buffer_used == 0) { if (conn->status == connection_closing) goto destroy_connection; else ev_io_stop(conn->event_loop, conn->write_watcher); } else { memmove(conn->write_buffers, conn->write_buffers+iovecs_written, sizeof(struct iovec)*conn->write_buffer_used); bzero(conn->write_buffers+conn->write_buffer_used, sizeof(struct iovec)*iovecs_written); if (remainder > 0) { unsigned char* iov_base = (unsigned char*)conn->write_buffers[0].iov_base; memmove(iov_base, iov_base+(conn->write_buffers[0].iov_len-remainder), remainder); conn->write_buffers[0].iov_len = remainder; } } pthread_mutex_unlock(&(conn->write_mutex)); return; destroy_connection: pthread_mutex_unlock(&(conn->write_mutex)); conn->close(true); conn->factory->destory_connection(conn); }
socket_server.h
Description: Binary data
_______________________________________________ libev mailing list libev@lists.schmorp.de http://lists.schmorp.de/cgi-bin/mailman/listinfo/libev