Hi, Was going to commit this, but cedric/bluebugs/etc suggested I send it here first to get a look over.
Attached is a patch which implements full threading support for ecore_con. When the ECORE_CON_USE_THREADS flag is passed in server_connect/add, it enables threaded serving mode. In this mode, all blocking I/O (receiving and sending on the sockets) spawns a new thread which then pipe writes the data back to main loop once the operation is complete. This will make all ecore_con operations and events asynchronous (wrt main loop) when enabled. In my preliminary tests, this worked flawlessly. I'll write back with some benchmarks after I get some sleep since I've spent the past 16 hours (it's now 10am) coding to try and get all of my last minute 1.0 merges in :) -- Mike Blumenkrantz Zentific: Our boolean values are huge.
Index: Ecore_Con.h =================================================================== --- Ecore_Con.h (revision 51113) +++ Ecore_Con.h (working copy) @@ -161,7 +161,9 @@ /** Use TLS */ ECORE_CON_USE_TLS = (1 << 6), /** Attempt to use the previously loaded certificate */ - ECORE_CON_LOAD_CERT = (1 << 7) + ECORE_CON_LOAD_CERT = (1 << 7), + /** Use threads for send/recv operations */ + ECORE_CON_USE_THREADS = (1 << 8) } Ecore_Con_Type; #define ECORE_CON_USE_SSL ECORE_CON_USE_SSL2 #define ECORE_CON_REMOTE_SYSTEM ECORE_CON_REMOTE_TCP Index: ecore_con_private.h =================================================================== --- ecore_con_private.h (revision 51113) +++ ecore_con_private.h (working copy) @@ -105,6 +105,9 @@ ECORE_MAGIC; int fd; Ecore_Con_Type type; + Eina_Bool threaded:1; + Ecore_Thread *recv_thread; + Ecore_Thread *send_thread; char *name; int port; char *path; Index: ecore_con.c =================================================================== --- ecore_con.c (revision 51113) +++ ecore_con.c (working copy) @@ -43,6 +43,8 @@ #include "Ecore_Con.h" #include "ecore_con_private.h" +typedef struct _Ecore_Con_Thread_Data Ecore_Con_Thread_Data; + static void _ecore_con_cb_tcp_connect(void *data, Ecore_Con_Info *info); static void _ecore_con_cb_udp_connect(void *data, Ecore_Con_Info *info); static void _ecore_con_cb_tcp_listen(void *data, Ecore_Con_Info *info); @@ -58,11 +60,25 @@ Ecore_Fd_Handler *fd_handler); static Eina_Bool _ecore_con_svr_udp_handler(void *data, Ecore_Fd_Handler *fd_handler); + +static void kill_server(Ecore_Con_Server *svr); +static void _ecore_con_svr_udp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl); +static void _ecore_con_svr_tcp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl); +static void _ecore_con_svr_notify(Ecore_Thread *thread, Ecore_Con_Thread_Data *data, Ecore_Con_Client *cl); +static void _ecore_con_svr_cancel(Ecore_Con_Client *cl); + +static void _ecore_con_cl_udp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr); +static void _ecore_con_cl_tcp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr); +static void _ecore_con_cl_notify(Ecore_Thread *thread, Ecore_Con_Event_Server_Data *data, Ecore_Con_Server *svr); + + static Eina_Bool _ecore_con_svr_cl_handler(void *data, Ecore_Fd_Handler *fd_handler); -static void _ecore_con_server_flush(Ecore_Con_Server *svr); -static void _ecore_con_client_flush(Ecore_Con_Client *cl); +static void _ecore_con_server_flush_end(Ecore_Con_Server *svr); +static void _ecore_con_client_flush_end(Ecore_Con_Client *cl); +static void _ecore_con_server_flush(Ecore_Thread *thread, Ecore_Con_Server *svr); +static void _ecore_con_client_flush(Ecore_Thread *thread, Ecore_Con_Client *cl); static void _ecore_con_event_client_add_free(void *data, void *ev); static void _ecore_con_event_client_del_free(void *data, void *ev); @@ -84,6 +100,12 @@ static int _ecore_con_init_count = 0; int _ecore_con_log_dom = -1; +struct _Ecore_Con_Thread_Data +{ + void *data; + int type; +}; + /** * @addtogroup Ecore_Con_Lib_Group Ecore Connection Library Functions * @@ -224,6 +246,8 @@ svr->clients = NULL; svr->ppid = getpid(); ecore_con_ssl_server_prepare(svr); + if ((type & ECORE_CON_SSL) & ECORE_CON_USE_THREADS == ECORE_CON_USE_THREADS) + svr->threaded = EINA_TRUE; type = compl_type & ECORE_CON_TYPE; @@ -333,6 +357,8 @@ svr->clients = NULL; svr->client_limit = -1; ecore_con_ssl_server_prepare(svr); + if ((type & ECORE_CON_SSL) & ECORE_CON_USE_THREADS == ECORE_CON_USE_THREADS) + svr->threaded = EINA_TRUE; type = compl_type & ECORE_CON_TYPE; @@ -624,7 +650,8 @@ } /** - * Flushes all pending data to the given server. Will return when done. + * Flushes all pending data to the given server. Will return when done unless + * the server is in threaded mode, in which case it will return immediately. * * @param svr The given server. */ @@ -637,7 +664,14 @@ return; } - _ecore_con_server_flush(svr); + if (svr->threaded) + ecore_long_run( + (Ecore_Thread_Heavy_Cb)_ecore_con_server_flush, + NULL, + (Ecore_Cb)_ecore_con_server_flush_end, + (Ecore_Cb)kill_server, svr, EINA_FALSE); + else + _ecore_con_server_flush(NULL, svr); } /** @@ -704,11 +738,17 @@ ecore_main_fd_handler_active_set( cl->fd_handler, ECORE_FD_READ | ECORE_FD_WRITE); - if(cl->server && ((cl->server->type & ECORE_CON_TYPE) == ECORE_CON_REMOTE_UDP)) - sendto(cl->server->fd, data, size, 0, (struct sockaddr *)cl->client_addr, - cl->client_addr_len); - else if (cl->buf) + if(cl->server && (!cl->server->threaded) && + ((cl->server->type & ECORE_CON_TYPE) == ECORE_CON_REMOTE_UDP)) { + if (!cl->server->threaded) + return (int)sendto(cl->server->fd, data, size, 0, + (struct sockaddr *)cl->client_addr, + cl->client_addr_len); + } + + if (cl->buf) + { unsigned char *newbuf; newbuf = realloc(cl->buf, cl->buf_size + size); @@ -870,7 +910,12 @@ return; } - _ecore_con_client_flush(cl); + if (cl->server->threaded) + ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_client_flush, + (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify, + (Ecore_Cb)_ecore_con_client_flush_end, + (Ecore_Cb)kill_server, cl->server, EINA_FALSE); + else _ecore_con_client_flush(NULL, cl); } /** @@ -985,7 +1030,7 @@ t_start = ecore_time_get(); while ((svr->write_buf) && (!svr->dead)) { - _ecore_con_server_flush(svr); + _ecore_con_server_flush(NULL, svr); t = ecore_time_get(); if ((t - t_start) > 0.5) { @@ -1002,8 +1047,7 @@ EINA_LIST_FREE(svr->clients, cl) _ecore_con_client_free(cl); if ((svr->created) && (svr->path) && (svr->ppid == getpid())) - unlink( - svr->path); + unlink(svr->path); if (svr->fd >= 0) close(svr->fd); @@ -1034,7 +1078,7 @@ t_start = ecore_time_get(); while ((cl->buf) && (!cl->dead)) { - _ecore_con_client_flush(cl); + _ecore_con_client_flush(NULL, cl); t = ecore_time_get(); if ((t - t_start) > 0.5) { @@ -1544,77 +1588,76 @@ return ECORE_CALLBACK_RENEW; } -static Eina_Bool -_ecore_con_cl_handler(void *data, Ecore_Fd_Handler *fd_handler) +static void +_ecore_con_cl_notify(Ecore_Thread *thread, Ecore_Con_Event_Server_Data *data, Ecore_Con_Server *svr) { - Ecore_Con_Server *svr; + ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, data, + _ecore_con_event_server_data_free, + NULL); +} - svr = data; - if (svr->dead) - return ECORE_CALLBACK_RENEW; +static void +_ecore_con_cl_tcp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr) +{ + unsigned char *inbuf = NULL; + int inbuf_num = 0; + int tries; - if (svr->delete_me) - return ECORE_CALLBACK_RENEW; + for (tries = 0; tries < 16; tries++) + { + int num; + int lost_server = 1; + unsigned char buf[READBUFSIZ]; - if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ)) - { - unsigned char *inbuf = NULL; - int inbuf_num = 0; - int tries; + if (!(svr->type & ECORE_CON_SSL)) + { + if ((num = read(svr->fd, buf, READBUFSIZ)) <= 0) + if ((num < 0) && (errno == EAGAIN)) + lost_server = 0; - if (svr->connecting && - (svr_try_connect(svr) != - ECORE_CON_CONNECTED)) - return ECORE_CALLBACK_RENEW; + } + else if (!(num = + ecore_con_ssl_server_read(svr, buf, + READBUFSIZ))) + lost_server = 0; - for (tries = 0; tries < 16; tries++) - { - int num; - int lost_server = 1; - unsigned char buf[READBUFSIZ]; + if (num < 1) + { + if (inbuf && !svr->delete_me) + { + Ecore_Con_Event_Server_Data *e; - if (!(svr->type & ECORE_CON_SSL)) - { - if ((num = read(svr->fd, buf, READBUFSIZ)) <= 0) - if ((num < 0) && (errno == EAGAIN)) - lost_server = 0; + e = calloc(1, sizeof(Ecore_Con_Event_Server_Data)); + if (e) + { + svr->event_count++; + e->server = svr; + e->data = inbuf; + e->size = inbuf_num; - } - else if (!(num = - ecore_con_ssl_server_read(svr, buf, - READBUFSIZ))) - lost_server = 0; + if (!thread) + ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e, + _ecore_con_event_server_data_free, + NULL); + else + ecore_thread_notify(thread, e); + } + } - if (num < 1) - { - if (inbuf && !svr->delete_me) - { - Ecore_Con_Event_Server_Data *e; + if (lost_server) + { + if (!thread) kill_server(svr); + else ecore_thread_cancel(thread); + } - e = calloc(1, sizeof(Ecore_Con_Event_Server_Data)); - if (e) - { - svr->event_count++; - e->server = svr; - e->data = inbuf; - e->size = inbuf_num; - ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e, - _ecore_con_event_server_data_free, - NULL); - } - } + break; + } - if (lost_server) - kill_server(svr); + inbuf = realloc(inbuf, inbuf_num + num); + memcpy(inbuf + inbuf_num, buf, num); + inbuf_num += num; + } - break; - } - - inbuf = realloc(inbuf, inbuf_num + num); - memcpy(inbuf + inbuf_num, buf, num); - inbuf_num += num; - } - /* #if USE_OPENSSL */ /* if (svr->fd_handler) */ /* { */ @@ -1624,21 +1667,11 @@ /* ecore_main_fd_handler_active_set(svr->fd_handler, ECORE_FD_WRITE); */ /* } */ /* #endif */ - } - else if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_WRITE)) - { - if (svr->connecting && - !svr_try_connect (svr)) - return ECORE_CALLBACK_RENEW; - _ecore_con_server_flush(svr); - } - - return ECORE_CALLBACK_RENEW; } static Eina_Bool -_ecore_con_cl_udp_handler(void *data, Ecore_Fd_Handler *fd_handler) +_ecore_con_cl_handler(void *data, Ecore_Fd_Handler *fd_handler) { Ecore_Con_Server *svr; @@ -1651,54 +1684,96 @@ if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ)) { - unsigned char buf[65536]; - int num = 0; + if (svr->connecting && + (svr_try_connect(svr) != + ECORE_CON_CONNECTED)) + return ECORE_CALLBACK_RENEW; - errno = 0; - num = read(svr->fd, buf, 65536); - if (num > 0) + if (svr->threaded) { - if (!svr->delete_me) - { - Ecore_Con_Event_Server_Data *e; - unsigned char *inbuf; + svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_cl_tcp_cb, + (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify, + NULL, (Ecore_Cb)kill_server, svr, EINA_FALSE); + if (!svr->recv_thread) + goto try_nothread; + } + else +try_nothread: + _ecore_con_cl_tcp_cb(NULL, svr); + } + else if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_WRITE)) + { + if (svr->connecting && + !svr_try_connect (svr)) + return ECORE_CALLBACK_RENEW; - inbuf = malloc(num); - if(inbuf == NULL) - return 1; + if (svr->threaded) + { + svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_server_flush, + NULL, + (Ecore_Cb)_ecore_con_server_flush_end, + (Ecore_Cb)kill_server, svr, EINA_FALSE); + if (!svr->recv_thread) + goto try_nothread_flush; + } + else +try_nothread_flush: + _ecore_con_server_flush(NULL, svr); + } - memcpy(inbuf, buf, num); + return ECORE_CALLBACK_RENEW; +} - e = calloc(1, sizeof(Ecore_Con_Event_Server_Data)); - if (e) - { - svr->event_count++; - e->server = svr; - e->data = inbuf; - e->size = num; - ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e, - _ecore_con_event_server_data_free, - NULL); - } +static void +_ecore_con_cl_udp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr) +{ + unsigned char buf[65536]; + int num = 0; + + errno = 0; + num = read(svr->fd, buf, 65536); + if (num > 0) + { + if (!svr->delete_me) + { + Ecore_Con_Event_Server_Data *e; + unsigned char *inbuf; + + inbuf = malloc(num); + if(inbuf == NULL) + return; + + memcpy(inbuf, buf, num); + + e = calloc(1, sizeof(Ecore_Con_Event_Server_Data)); + if (e) + { + svr->event_count++; + e->server = svr; + e->data = inbuf; + e->size = num; + if (!thread) + ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e, + _ecore_con_event_server_data_free, + NULL); + else + ecore_thread_notify(thread, e); } } - else if ((errno == EIO) || (errno == EBADF) || - (errno == EPIPE) || (errno == EINVAL) || - (errno == ENOSPC) || (errno == ECONNREFUSED)) - kill_server(svr); } - else if (ecore_main_fd_handler_active_get(fd_handler, - ECORE_FD_WRITE)) - _ecore_con_server_flush(svr); - - return ECORE_CALLBACK_RENEW; + else if ((errno == EIO) || (errno == EBADF) || + (errno == EPIPE) || (errno == EINVAL) || + (errno == ENOSPC) || (errno == ECONNREFUSED)) + { + if (!thread) kill_server(svr); + else ecore_thread_cancel(thread); + } } static Eina_Bool -_ecore_con_svr_udp_handler(void *data, Ecore_Fd_Handler *fd_handler) +_ecore_con_cl_udp_handler(void *data, Ecore_Fd_Handler *fd_handler) { Ecore_Con_Server *svr; - Ecore_Con_Client *cl = NULL; svr = data; if (svr->dead) @@ -1709,236 +1784,482 @@ if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ)) { - unsigned char buf[READBUFSIZ]; - unsigned char client_addr[256]; - unsigned int client_addr_len = sizeof(client_addr); - int num; + if (svr->threaded) + { + svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_cl_udp_cb, + (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify, + NULL, (Ecore_Cb)kill_server, svr, EINA_FALSE); + if (!svr->recv_thread) + goto try_nothread; + } + else + { +try_nothread: + _ecore_con_cl_udp_cb(NULL, svr); + } + } + else if (ecore_main_fd_handler_active_get(fd_handler, + ECORE_FD_WRITE)) + { + if (svr->threaded) + { + svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_server_flush, + NULL, + (Ecore_Cb)_ecore_con_server_flush_end, + (Ecore_Cb)kill_server, svr, EINA_FALSE); + if (!svr->recv_thread) + goto try_nothread_flush; + } + else +try_nothread_flush: + _ecore_con_server_flush(NULL, svr); + } - errno = 0; + return ECORE_CALLBACK_RENEW; +} + +static void +_ecore_con_svr_notify(Ecore_Thread *thread, Ecore_Con_Thread_Data *data, Ecore_Con_Client *cl) +{ + if (data->type == ECORE_CON_EVENT_CLIENT_ADD) + ecore_event_add(ECORE_CON_EVENT_CLIENT_ADD, + data->data, + _ecore_con_event_client_add_free, + NULL); + + else if (data->type == ECORE_CON_EVENT_CLIENT_DEL) + ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, + data->data, + _ecore_con_event_client_del_free, + NULL); + else + ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, + data->data, + _ecore_con_event_client_data_free, + NULL); + + free(data); +} + +static void +_ecore_con_svr_udp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl) +{ + unsigned char buf[READBUFSIZ]; + unsigned char client_addr[256]; + unsigned int client_addr_len = sizeof(client_addr); + int num; + Ecore_Con_Server *svr = cl->server; + + errno = 0; #ifdef _WIN32 - num = fcntl(svr->fd, F_SETFL, O_NONBLOCK); - if (num >= 0) - num = - recvfrom(svr->fd, buf, sizeof(buf), 0, - (struct sockaddr *)&client_addr, - &client_addr_len); + num = fcntl(svr->fd, F_SETFL, O_NONBLOCK); + if (num >= 0) + num = recvfrom(svr->fd, buf, sizeof(buf), 0, + (struct sockaddr *)&client_addr, + &client_addr_len); #else - num = - recvfrom(svr->fd, buf, sizeof(buf), MSG_DONTWAIT, - (struct sockaddr *)&client_addr, - &client_addr_len); + num = recvfrom(svr->fd, buf, sizeof(buf), MSG_DONTWAIT, + (struct sockaddr *)&client_addr, + &client_addr_len); #endif - if (num > 0) + if (num > 0) + { + if (!svr->delete_me) { - if (!svr->delete_me) + Ecore_Con_Event_Client_Data *e; + unsigned char *inbuf; + + /* Create a new client for use in the client data event */ + cl = calloc(1, sizeof(Ecore_Con_Client)); + if(cl == NULL) + return; + + cl->buf = NULL; + cl->fd = 0; + cl->fd_handler = NULL; + cl->server = svr; + cl->client_addr = calloc(1, client_addr_len); + cl->client_addr_len = client_addr_len; + if(cl->client_addr == NULL) { - Ecore_Con_Event_Client_Data *e; - unsigned char *inbuf; + free(cl); + return; + } - /* Create a new client for use in the client data event */ - cl = calloc(1, sizeof(Ecore_Con_Client)); - if(cl == NULL) - return ECORE_CALLBACK_RENEW; + memcpy(cl->client_addr, &client_addr, client_addr_len); + ECORE_MAGIC_SET(cl, ECORE_MAGIC_CON_CLIENT); + svr->clients = eina_list_append(svr->clients, cl); - cl->buf = NULL; - cl->fd = 0; - cl->fd_handler = NULL; - cl->server = svr; - cl->client_addr = calloc(1, client_addr_len); - cl->client_addr_len = client_addr_len; - if(cl->client_addr == NULL) - { - free(cl); - return ECORE_CALLBACK_RENEW; - } + cl->ip = _ecore_con_pretty_ip(cl->client_addr, + cl->client_addr_len); - memcpy(cl->client_addr, &client_addr, client_addr_len); - ECORE_MAGIC_SET(cl, ECORE_MAGIC_CON_CLIENT); - svr->clients = eina_list_append(svr->clients, cl); + inbuf = malloc(num); + if(inbuf == NULL) + { + free(cl->client_addr); + free(cl); + } - cl->ip = _ecore_con_pretty_ip(cl->client_addr, - cl->client_addr_len); + memcpy(inbuf, buf, num); - inbuf = malloc(num); - if(inbuf == NULL) + e = calloc(1, sizeof(Ecore_Con_Event_Client_Data)); + if (e) + { + svr->event_count++; + e->client = cl; + e->data = inbuf; + e->size = num; + if (!thread) + ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e, + _ecore_con_event_client_data_free, + NULL); + else { - free(cl->client_addr); - free(cl); - return ECORE_CALLBACK_RENEW; - } + Ecore_Con_Thread_Data *td; - memcpy(inbuf, buf, num); + if (!(td = malloc(sizeof(Ecore_Con_Thread_Data)))) + { + WRN("Allocation failure in ecore_con threaded server data!"); + return; + } - e = calloc(1, sizeof(Ecore_Con_Event_Client_Data)); - if (e) - { - svr->event_count++; - e->client = cl; - e->data = inbuf; - e->size = num; - ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e, - _ecore_con_event_client_data_free, - NULL); + td->type = ECORE_CON_EVENT_CLIENT_DATA; + td->data = e; + ecore_thread_notify(thread, td); } + } - if(!cl->delete_me) + if(!cl->delete_me) + { + Ecore_Con_Event_Client_Add *add; + + add = calloc(1, sizeof(Ecore_Con_Event_Client_Add)); + if(!add) + return; + +/*cl->event_count++;*/ + add->client = cl; + if (!thread) + ecore_event_add(ECORE_CON_EVENT_CLIENT_ADD, + add, + _ecore_con_event_client_add_free, + NULL); + else { - Ecore_Con_Event_Client_Add *add; + Ecore_Con_Thread_Data *td; - add = calloc(1, sizeof(Ecore_Con_Event_Client_Add)); - if(add) + if (!(td = malloc(sizeof(Ecore_Con_Thread_Data)))) { -/*cl->event_count++;*/ - add->client = cl; - ecore_event_add(ECORE_CON_EVENT_CLIENT_ADD, - add, - _ecore_con_event_client_add_free, - NULL); + WRN("Allocation failure in ecore_con threaded server data!"); + return; } + + td->type = ECORE_CON_EVENT_CLIENT_ADD; + td->data = e; + ecore_thread_notify(thread, td); } + } } - else if ((errno == EIO) || (errno == EBADF) || - (errno == EPIPE) || (errno == EINVAL) || - (errno == ENOSPC) || (errno == ECONNREFUSED)) + } + else if ((errno == EIO) || (errno == EBADF) || + (errno == EPIPE) || (errno == EINVAL) || + (errno == ENOSPC) || (errno == ECONNREFUSED)) + { + if (!svr->delete_me) { - if (!svr->delete_me) + /* we lost our client! */ + Ecore_Con_Event_Client_Del *e; + + e = calloc(1, sizeof(Ecore_Con_Event_Client_Del)); + if (e) { - /* we lost our client! */ - Ecore_Con_Event_Client_Del *e; + svr->event_count++; + /* be explicit here */ + e->client = NULL; + if (!thread) + ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e, + _ecore_con_event_client_del_free, + NULL); + else + { + Ecore_Con_Thread_Data *td; - e = calloc(1, sizeof(Ecore_Con_Event_Client_Del)); - if (e) - { - svr->event_count++; - /* be explicit here */ - e->client = NULL; - ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e, - _ecore_con_event_client_del_free, - NULL); + if (!(td = malloc(sizeof(Ecore_Con_Thread_Data)))) + { + WRN("Allocation failure in ecore_con threaded server data!"); + return; + } + + td->type = ECORE_CON_EVENT_CLIENT_DEL; + td->data = e; + ecore_thread_notify(thread, td); } } + } - svr->dead = 1; - if (svr->fd_handler) - ecore_main_fd_handler_del(svr->fd_handler); + svr->dead = 1; + if (thread) + { + ecore_thread_cancel(thread); + return; + } + if (svr->fd_handler) + ecore_main_fd_handler_del(svr->fd_handler); - svr->fd_handler = NULL; - } + svr->fd_handler = NULL; } - else if (ecore_main_fd_handler_active_get(fd_handler, - ECORE_FD_WRITE)) - _ecore_con_client_flush(cl); - - return ECORE_CALLBACK_RENEW; } static Eina_Bool -_ecore_con_svr_cl_handler(void *data, Ecore_Fd_Handler *fd_handler) +_ecore_con_svr_udp_handler(void *data, Ecore_Fd_Handler *fd_handler) { - Ecore_Con_Client *cl; + Ecore_Con_Server *svr; + Ecore_Con_Client *cl = NULL; - cl = data; - if (cl->dead) + svr = data; + if (svr->dead) return ECORE_CALLBACK_RENEW; - if (cl->delete_me) + if (svr->delete_me) return ECORE_CALLBACK_RENEW; + if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ)) { - unsigned char *inbuf = NULL; - int inbuf_num = 0; - int tries; + if (svr->threaded) + { + svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_svr_udp_cb, + (Ecore_Thread_Notify_Cb)_ecore_con_svr_notify, + NULL, (Ecore_Cb)kill_server, svr, EINA_FALSE); + if (!svr->recv_thread) + goto try_nothread; + } + else + { +try_nothread: + _ecore_con_svr_udp_cb(NULL, cl); + } + } + else if (ecore_main_fd_handler_active_get(fd_handler, + ECORE_FD_WRITE)) + { + if (svr->threaded) + { + svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_client_flush, + (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify, + (Ecore_Cb)_ecore_con_client_flush_end, + (Ecore_Cb)kill_server, svr, EINA_FALSE); + if (!svr->recv_thread) + goto try_nothread_flush; + } + else +try_nothread_flush: + _ecore_con_client_flush(NULL, cl); + } + return ECORE_CALLBACK_RENEW; +} - for (tries = 0; tries < 16; tries++) +static void +_ecore_con_svr_cancel(Ecore_Con_Client *cl) +{ + cl->dead = 1; + if (cl->fd_handler) + ecore_main_fd_handler_del(cl->fd_handler); + + cl->fd_handler = NULL; +} + +static void +_ecore_con_svr_tcp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl) +{ + unsigned char *inbuf = NULL; + int inbuf_num = 0; + int tries; + Ecore_Con_Server *svr = cl->server; + + for (tries = 0; tries < 16; tries++) + { + int num; + int lost_client = 1; + unsigned char buf[READBUFSIZ]; + + errno = 0; + + if (!(cl->server->type & ECORE_CON_SSL)) { - int num; - int lost_client = 1; - unsigned char buf[READBUFSIZ]; + if ((num = read(cl->fd, buf, READBUFSIZ)) <= 0) + if ((num < 0) && (errno == EAGAIN)) + lost_client = 0; - errno = 0; + } + else if (!(num = + ecore_con_ssl_client_read(cl, buf, + READBUFSIZ))) + lost_client = 0; - if (!(cl->server->type & ECORE_CON_SSL)) + if (num < 1) + { + if (inbuf && !cl->delete_me) { - if ((num = read(cl->fd, buf, READBUFSIZ)) <= 0) - if ((num < 0) && (errno == EAGAIN)) - lost_client = 0; + Ecore_Con_Event_Client_Data *e; + e = calloc(1, sizeof(Ecore_Con_Event_Client_Data)); + if (e) + { + cl->event_count++; + e->client = cl; + e->data = inbuf; + e->size = inbuf_num; + if (!thread) + ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e, + _ecore_con_event_client_data_free, + NULL); + else + { + Ecore_Con_Thread_Data *td; + + if (!(td = malloc(sizeof(Ecore_Con_Thread_Data)))) + { + WRN("Allocation failure in ecore_con threaded server data!"); + return; + } + + td->type = ECORE_CON_EVENT_CLIENT_DATA; + td->data = e; + ecore_thread_notify(thread, td); + } + } } - else if (!(num = - ecore_con_ssl_client_read(cl, buf, - READBUFSIZ))) - lost_client = 0; - if (num < 1) + if (lost_client) { - if (inbuf && !cl->delete_me) + if (!cl->delete_me) { - Ecore_Con_Event_Client_Data *e; + /* we lost our client! */ + Ecore_Con_Event_Client_Del *e; - e = calloc(1, sizeof(Ecore_Con_Event_Client_Data)); + e = calloc(1, sizeof(Ecore_Con_Event_Client_Del)); if (e) { cl->event_count++; e->client = cl; - e->data = inbuf; - e->size = inbuf_num; - ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e, - _ecore_con_event_client_data_free, - NULL); - } - } + if (!thread) + ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, + e, + _ecore_con_event_client_del_free, + NULL); + else + { + Ecore_Con_Thread_Data *td; - if (lost_client) - { - if (!cl->delete_me) - { - /* we lost our client! */ - Ecore_Con_Event_Client_Del *e; + if (!(td = malloc(sizeof(Ecore_Con_Thread_Data)))) + { + WRN("Allocation failure in ecore_con threaded server data!"); + return; + } - e = calloc(1, sizeof(Ecore_Con_Event_Client_Del)); - if (e) - { - cl->event_count++; - e->client = cl; - ecore_event_add( - ECORE_CON_EVENT_CLIENT_DEL, - e, - _ecore_con_event_client_del_free, - NULL); + td->type = ECORE_CON_EVENT_CLIENT_DEL; + td->data = e; + ecore_thread_notify(thread, td); } } + } + if (thread) + ecore_thread_cancel(thread); + else + { cl->dead = 1; if (cl->fd_handler) ecore_main_fd_handler_del(cl->fd_handler); cl->fd_handler = NULL; } + } - break; - } - else - { - inbuf = realloc(inbuf, inbuf_num + num); - memcpy(inbuf + inbuf_num, buf, num); - inbuf_num += num; - } + break; } + else + { + inbuf = realloc(inbuf, inbuf_num + num); + memcpy(inbuf + inbuf_num, buf, num); + inbuf_num += num; + } } +} + +static Eina_Bool +_ecore_con_svr_cl_handler(void *data, Ecore_Fd_Handler *fd_handler) +{ + Ecore_Con_Client *cl; + + cl = data; + if (cl->dead) + return ECORE_CALLBACK_RENEW; + + if (cl->delete_me) + return ECORE_CALLBACK_RENEW; + + if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ)) + { + if (cl->server->threaded) + { + cl->server->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_svr_tcp_cb, + (Ecore_Thread_Notify_Cb)_ecore_con_svr_notify, + NULL, + (Ecore_Cb)_ecore_con_svr_cancel, + cl, EINA_FALSE); + if (!cl->server->recv_thread) + goto try_nothread; + } + else + { +try_nothread: + _ecore_con_svr_udp_cb(NULL, cl); + } + } else if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_WRITE)) - _ecore_con_client_flush(cl); + { + if (cl->server->threaded) + { + cl->server->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_client_flush, + (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify, + (Ecore_Cb)_ecore_con_client_flush_end, + (Ecore_Cb)_ecore_con_svr_cancel, cl, + EINA_FALSE); + if (!cl->server->recv_thread) + goto try_nothread_flush; + } + else +try_nothread_flush: + _ecore_con_client_flush(NULL, cl); + } return ECORE_CALLBACK_RENEW; } static void -_ecore_con_server_flush(Ecore_Con_Server *svr) +_ecore_con_server_flush_end(Ecore_Con_Server *svr) { + if (svr->write_buf_offset < svr->write_buf_size) + return; + + svr->write_buf_size = 0; + svr->write_buf_offset = 0; + free(svr->write_buf); + svr->write_buf = NULL; + if (svr->fd_handler) + ecore_main_fd_handler_active_set(svr->fd_handler, + ECORE_FD_READ); +} + +static void +_ecore_con_server_flush(Ecore_Thread *thread, Ecore_Con_Server *svr) +{ int count, num; if (!svr->write_buf) @@ -1963,11 +2284,18 @@ if (count < 1) { /* we lost our server! */ - kill_server(svr); + if (!thread) + return kill_server(svr); + + ecore_thread_cancel(thread); return; } svr->write_buf_offset += count; + + if (thread) + return; + if (svr->write_buf_offset >= svr->write_buf_size) { svr->write_buf_size = 0; @@ -1981,8 +2309,22 @@ } static void -_ecore_con_client_flush(Ecore_Con_Client *cl) +_ecore_con_client_flush_end(Ecore_Con_Client *cl) { + if (cl->buf_offset < cl->buf_size) + return; + + cl->buf_size = 0; + cl->buf_offset = 0; + free(cl->buf); + cl->buf = NULL; + if (cl->fd_handler) + ecore_main_fd_handler_active_set(cl->fd_handler, ECORE_FD_READ); +} + +static void +_ecore_con_client_flush(Ecore_Thread *thread, Ecore_Con_Client *cl) +{ int count, num; if (!cl->buf) @@ -2009,11 +2351,32 @@ { cl->event_count++; e->client = cl; - ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e, - _ecore_con_event_client_del_free, NULL); + if (!thread) + ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e, + _ecore_con_event_client_del_free, NULL); + else + { + Ecore_Con_Thread_Data *td; + + if (!(td = malloc(sizeof(Ecore_Con_Thread_Data)))) + { + WRN("Allocation failure in ecore_con threaded server data!"); + return; + } + + td->type = ECORE_CON_EVENT_CLIENT_DEL; + td->data = e; + ecore_thread_notify(thread, td); + } } cl->dead = 1; + if (thread) + { + ecore_thread_cancel(thread); + return; + } + if (cl->fd_handler) ecore_main_fd_handler_del(cl->fd_handler); @@ -2024,6 +2387,8 @@ } cl->buf_offset += count; + if (thread) + return; if (cl->buf_offset >= cl->buf_size) { cl->buf_size = 0;
------------------------------------------------------------------------------ This SF.net email is sponsored by Make an app they can't live without Enter the BlackBerry Developer Challenge http://p.sf.net/sfu/RIM-dev2dev
_______________________________________________ enlightenment-devel mailing list enlightenment-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/enlightenment-devel