Hi,

Was going to commit this, but cedric/bluebugs/etc suggested I send it here
first to get a look over.

Attached is a patch which implements full threading support for ecore_con.
When the ECORE_CON_USE_THREADS flag is passed in server_connect/add, it enables
threaded serving mode.  In this mode, all blocking I/O (receiving and sending
on the sockets) spawns a new thread which then pipe writes the data back to
main loop once the operation is complete.  This will make all ecore_con
operations and events asynchronous (wrt main loop) when enabled.

In my preliminary tests, this worked flawlessly.  I'll write back with some
benchmarks after I get some sleep since I've spent the past 16 hours
(it's now 10am) coding to try and get all of my last minute 1.0 merges in :)

-- 
Mike Blumenkrantz
Zentific: Our boolean values are huge.
Index: Ecore_Con.h
===================================================================
--- Ecore_Con.h	(revision 51113)
+++ Ecore_Con.h	(working copy)
@@ -161,7 +161,9 @@
    /** Use TLS */
    ECORE_CON_USE_TLS = (1 << 6),
    /** Attempt to use the previously loaded certificate */
-   ECORE_CON_LOAD_CERT = (1 << 7)
+   ECORE_CON_LOAD_CERT = (1 << 7),
+   /** Use threads for send/recv operations */
+   ECORE_CON_USE_THREADS = (1 << 8)
 } Ecore_Con_Type;
 #define ECORE_CON_USE_SSL ECORE_CON_USE_SSL2
 #define ECORE_CON_REMOTE_SYSTEM ECORE_CON_REMOTE_TCP
Index: ecore_con_private.h
===================================================================
--- ecore_con_private.h	(revision 51113)
+++ ecore_con_private.h	(working copy)
@@ -105,6 +105,9 @@
    ECORE_MAGIC;
    int fd;
    Ecore_Con_Type type;
+   Eina_Bool threaded:1;
+   Ecore_Thread *recv_thread;
+   Ecore_Thread *send_thread;
    char *name;
    int port;
    char *path;
Index: ecore_con.c
===================================================================
--- ecore_con.c	(revision 51113)
+++ ecore_con.c	(working copy)
@@ -43,6 +43,8 @@
 #include "Ecore_Con.h"
 #include "ecore_con_private.h"
 
+typedef struct _Ecore_Con_Thread_Data Ecore_Con_Thread_Data;
+
 static void      _ecore_con_cb_tcp_connect(void *data, Ecore_Con_Info *info);
 static void      _ecore_con_cb_udp_connect(void *data, Ecore_Con_Info *info);
 static void      _ecore_con_cb_tcp_listen(void *data, Ecore_Con_Info *info);
@@ -58,11 +60,25 @@
                                            Ecore_Fd_Handler *fd_handler);
 static Eina_Bool _ecore_con_svr_udp_handler(void *data,
                                             Ecore_Fd_Handler *fd_handler);
+
+static void kill_server(Ecore_Con_Server *svr);
+static void _ecore_con_svr_udp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl);
+static void _ecore_con_svr_tcp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl);
+static void _ecore_con_svr_notify(Ecore_Thread *thread, Ecore_Con_Thread_Data *data, Ecore_Con_Client *cl);
+static void _ecore_con_svr_cancel(Ecore_Con_Client *cl);
+
+static void _ecore_con_cl_udp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr);
+static void _ecore_con_cl_tcp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr);
+static void _ecore_con_cl_notify(Ecore_Thread *thread, Ecore_Con_Event_Server_Data *data, Ecore_Con_Server *svr);
+
+
 static Eina_Bool _ecore_con_svr_cl_handler(void *data,
                                            Ecore_Fd_Handler *fd_handler);
 
-static void      _ecore_con_server_flush(Ecore_Con_Server *svr);
-static void      _ecore_con_client_flush(Ecore_Con_Client *cl);
+static void      _ecore_con_server_flush_end(Ecore_Con_Server *svr);
+static void      _ecore_con_client_flush_end(Ecore_Con_Client *cl);
+static void      _ecore_con_server_flush(Ecore_Thread *thread, Ecore_Con_Server *svr);
+static void      _ecore_con_client_flush(Ecore_Thread *thread, Ecore_Con_Client *cl);
 
 static void      _ecore_con_event_client_add_free(void *data, void *ev);
 static void      _ecore_con_event_client_del_free(void *data, void *ev);
@@ -84,6 +100,12 @@
 static int _ecore_con_init_count = 0;
 int _ecore_con_log_dom = -1;
 
+struct _Ecore_Con_Thread_Data
+{
+   void *data;
+   int type;
+};
+
 /**
  * @addtogroup Ecore_Con_Lib_Group Ecore Connection Library Functions
  *
@@ -224,6 +246,8 @@
    svr->clients = NULL;
    svr->ppid = getpid();
    ecore_con_ssl_server_prepare(svr);
+   if ((type & ECORE_CON_SSL) & ECORE_CON_USE_THREADS == ECORE_CON_USE_THREADS)
+     svr->threaded = EINA_TRUE;
 
    type = compl_type & ECORE_CON_TYPE;
 
@@ -333,6 +357,8 @@
    svr->clients = NULL;
    svr->client_limit = -1;
    ecore_con_ssl_server_prepare(svr);
+   if ((type & ECORE_CON_SSL) & ECORE_CON_USE_THREADS == ECORE_CON_USE_THREADS)
+     svr->threaded = EINA_TRUE;
 
    type = compl_type & ECORE_CON_TYPE;
 
@@ -624,7 +650,8 @@
 }
 
 /**
- * Flushes all pending data to the given server. Will return when done.
+ * Flushes all pending data to the given server. Will return when done unless
+ * the server is in threaded mode, in which case it will return immediately.
  *
  * @param   svr           The given server.
  */
@@ -637,7 +664,14 @@
         return;
      }
 
-   _ecore_con_server_flush(svr);
+   if (svr->threaded)
+     ecore_long_run(
+       (Ecore_Thread_Heavy_Cb)_ecore_con_server_flush,
+       NULL,
+       (Ecore_Cb)_ecore_con_server_flush_end,
+       (Ecore_Cb)kill_server, svr, EINA_FALSE);
+   else
+     _ecore_con_server_flush(NULL, svr);
 }
 
 /**
@@ -704,11 +738,17 @@
       ecore_main_fd_handler_active_set(
          cl->fd_handler, ECORE_FD_READ | ECORE_FD_WRITE);
 
-   if(cl->server && ((cl->server->type & ECORE_CON_TYPE) == ECORE_CON_REMOTE_UDP))
-      sendto(cl->server->fd, data, size, 0, (struct sockaddr *)cl->client_addr,
-             cl->client_addr_len);
-   else if (cl->buf)
+   if(cl->server && (!cl->server->threaded) &&
+     ((cl->server->type & ECORE_CON_TYPE) == ECORE_CON_REMOTE_UDP))
      {
+        if (!cl->server->threaded)
+          return (int)sendto(cl->server->fd, data, size, 0,
+                 (struct sockaddr *)cl->client_addr,
+                 cl->client_addr_len);
+     }
+
+   if (cl->buf)
+     {
         unsigned char *newbuf;
 
         newbuf = realloc(cl->buf, cl->buf_size + size);
@@ -870,7 +910,12 @@
         return;
      }
 
-   _ecore_con_client_flush(cl);
+   if (cl->server->threaded)
+     ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_client_flush,
+                    (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify,
+                    (Ecore_Cb)_ecore_con_client_flush_end,
+                    (Ecore_Cb)kill_server, cl->server, EINA_FALSE);
+   else _ecore_con_client_flush(NULL, cl);
 }
 
 /**
@@ -985,7 +1030,7 @@
    t_start = ecore_time_get();
    while ((svr->write_buf) && (!svr->dead))
      {
-        _ecore_con_server_flush(svr);
+        _ecore_con_server_flush(NULL, svr);
         t = ecore_time_get();
         if ((t - t_start) > 0.5)
           {
@@ -1002,8 +1047,7 @@
    EINA_LIST_FREE(svr->clients, cl)
    _ecore_con_client_free(cl);
    if ((svr->created) && (svr->path) && (svr->ppid == getpid()))
-      unlink(
-         svr->path);
+      unlink(svr->path);
 
    if (svr->fd >= 0)
       close(svr->fd);
@@ -1034,7 +1078,7 @@
    t_start = ecore_time_get();
    while ((cl->buf) && (!cl->dead))
      {
-        _ecore_con_client_flush(cl);
+        _ecore_con_client_flush(NULL, cl);
         t = ecore_time_get();
         if ((t - t_start) > 0.5)
           {
@@ -1544,77 +1588,76 @@
    return ECORE_CALLBACK_RENEW;
 }
 
-static Eina_Bool
-_ecore_con_cl_handler(void *data, Ecore_Fd_Handler *fd_handler)
+static void
+_ecore_con_cl_notify(Ecore_Thread *thread, Ecore_Con_Event_Server_Data *data, Ecore_Con_Server *svr)
 {
-   Ecore_Con_Server *svr;
+   ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, data,
+                   _ecore_con_event_server_data_free,
+                   NULL);
+}
 
-   svr = data;
-   if (svr->dead)
-      return ECORE_CALLBACK_RENEW;
+static void
+_ecore_con_cl_tcp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr)
+{
+    unsigned char *inbuf = NULL;
+    int inbuf_num = 0;
+    int tries;
 
-   if (svr->delete_me)
-      return ECORE_CALLBACK_RENEW;
+    for (tries = 0; tries < 16; tries++)
+      {
+         int num;
+         int lost_server = 1;
+         unsigned char buf[READBUFSIZ];
 
-   if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ))
-     {
-        unsigned char *inbuf = NULL;
-        int inbuf_num = 0;
-        int tries;
+         if (!(svr->type & ECORE_CON_SSL))
+           {
+              if ((num = read(svr->fd, buf, READBUFSIZ)) <= 0)
+                 if ((num < 0) && (errno == EAGAIN))
+                    lost_server = 0;
 
-        if (svr->connecting &&
-            (svr_try_connect(svr) !=
-             ECORE_CON_CONNECTED))
-           return ECORE_CALLBACK_RENEW;
+           }
+         else if (!(num =
+                       ecore_con_ssl_server_read(svr, buf,
+                                                 READBUFSIZ)))
+            lost_server = 0;
 
-        for (tries = 0; tries < 16; tries++)
-          {
-             int num;
-             int lost_server = 1;
-             unsigned char buf[READBUFSIZ];
+         if (num < 1)
+           {
+              if (inbuf && !svr->delete_me)
+                {
+                   Ecore_Con_Event_Server_Data *e;
 
-             if (!(svr->type & ECORE_CON_SSL))
-               {
-                  if ((num = read(svr->fd, buf, READBUFSIZ)) <= 0)
-                     if ((num < 0) && (errno == EAGAIN))
-                        lost_server = 0;
+                   e = calloc(1, sizeof(Ecore_Con_Event_Server_Data));
+                   if (e)
+                     {
+                        svr->event_count++;
+                        e->server = svr;
+                        e->data = inbuf;
+                        e->size = inbuf_num;
 
-               }
-             else if (!(num =
-                           ecore_con_ssl_server_read(svr, buf,
-                                                     READBUFSIZ)))
-                lost_server = 0;
+                        if (!thread)
+                          ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e,
+                                          _ecore_con_event_server_data_free,
+                                          NULL);
+                        else
+                          ecore_thread_notify(thread, e);
+                     }
+                }
 
-             if (num < 1)
-               {
-                  if (inbuf && !svr->delete_me)
-                    {
-                       Ecore_Con_Event_Server_Data *e;
+              if (lost_server)
+                {
+                   if (!thread) kill_server(svr);
+                   else ecore_thread_cancel(thread);
+                }
 
-                       e = calloc(1, sizeof(Ecore_Con_Event_Server_Data));
-                       if (e)
-                         {
-                            svr->event_count++;
-                            e->server = svr;
-                            e->data = inbuf;
-                            e->size = inbuf_num;
-                            ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e,
-                                            _ecore_con_event_server_data_free,
-                                            NULL);
-                         }
-                    }
+              break;
+           }
 
-                  if (lost_server)
-                     kill_server(svr);
+         inbuf = realloc(inbuf, inbuf_num + num);
+         memcpy(inbuf + inbuf_num, buf, num);
+         inbuf_num += num;
+      }
 
-                  break;
-               }
-
-             inbuf = realloc(inbuf, inbuf_num + num);
-             memcpy(inbuf + inbuf_num, buf, num);
-             inbuf_num += num;
-          }
-
 /* #if USE_OPENSSL */
 /* if (svr->fd_handler) */
 /*   { */
@@ -1624,21 +1667,11 @@
 /*        ecore_main_fd_handler_active_set(svr->fd_handler, ECORE_FD_WRITE); */
 /*   } */
 /* #endif */
-     }
-   else if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_WRITE))
-     {
-        if (svr->connecting &&
-            !svr_try_connect (svr))
-           return ECORE_CALLBACK_RENEW;
 
-        _ecore_con_server_flush(svr);
-     }
-
-   return ECORE_CALLBACK_RENEW;
 }
 
 static Eina_Bool
-_ecore_con_cl_udp_handler(void *data, Ecore_Fd_Handler *fd_handler)
+_ecore_con_cl_handler(void *data, Ecore_Fd_Handler *fd_handler)
 {
    Ecore_Con_Server *svr;
 
@@ -1651,54 +1684,96 @@
 
    if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ))
      {
-        unsigned char buf[65536];
-        int num = 0;
+        if (svr->connecting &&
+            (svr_try_connect(svr) !=
+             ECORE_CON_CONNECTED))
+           return ECORE_CALLBACK_RENEW;
 
-        errno = 0;
-        num = read(svr->fd, buf, 65536);
-        if (num > 0)
+        if (svr->threaded)
           {
-             if (!svr->delete_me)
-               {
-                  Ecore_Con_Event_Server_Data *e;
-                  unsigned char *inbuf;
+             svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_cl_tcp_cb,
+                                               (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify,
+                                               NULL, (Ecore_Cb)kill_server, svr, EINA_FALSE);
+             if (!svr->recv_thread)
+               goto try_nothread;
+          }
+        else
+try_nothread:
+          _ecore_con_cl_tcp_cb(NULL, svr);
+     }
+   else if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_WRITE))
+     {
+        if (svr->connecting &&
+            !svr_try_connect (svr))
+           return ECORE_CALLBACK_RENEW;
 
-                  inbuf = malloc(num);
-                  if(inbuf == NULL)
-                     return 1;
+        if (svr->threaded)
+          {
+             svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_server_flush,
+                                               NULL,
+                                               (Ecore_Cb)_ecore_con_server_flush_end,
+                                               (Ecore_Cb)kill_server, svr, EINA_FALSE);
+             if (!svr->recv_thread)
+               goto try_nothread_flush;
+          }
+        else
+try_nothread_flush:
+          _ecore_con_server_flush(NULL, svr);
+     }
 
-                  memcpy(inbuf, buf, num);
+   return ECORE_CALLBACK_RENEW;
+}
 
-                  e = calloc(1, sizeof(Ecore_Con_Event_Server_Data));
-                  if (e)
-                    {
-                       svr->event_count++;
-                       e->server = svr;
-                       e->data = inbuf;
-                       e->size = num;
-                       ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e,
-                                       _ecore_con_event_server_data_free,
-                                       NULL);
-                    }
+static void
+_ecore_con_cl_udp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr)
+{
+   unsigned char buf[65536];
+   int num = 0;
+
+   errno = 0;
+   num = read(svr->fd, buf, 65536);
+   if (num > 0)
+     {
+        if (!svr->delete_me)
+          {
+             Ecore_Con_Event_Server_Data *e;
+             unsigned char *inbuf;
+
+             inbuf = malloc(num);
+             if(inbuf == NULL)
+                return;
+
+             memcpy(inbuf, buf, num);
+
+             e = calloc(1, sizeof(Ecore_Con_Event_Server_Data));
+             if (e)
+               {
+                  svr->event_count++;
+                  e->server = svr;
+                  e->data = inbuf;
+                  e->size = num;
+                  if (!thread)
+                    ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e,
+                                    _ecore_con_event_server_data_free,
+                                    NULL);
+                  else
+                    ecore_thread_notify(thread, e);
                }
           }
-        else if ((errno == EIO) || (errno == EBADF) ||
-                 (errno == EPIPE) || (errno == EINVAL) ||
-                 (errno == ENOSPC) || (errno == ECONNREFUSED))
-           kill_server(svr);
      }
-   else if (ecore_main_fd_handler_active_get(fd_handler,
-                                             ECORE_FD_WRITE))
-      _ecore_con_server_flush(svr);
-
-   return ECORE_CALLBACK_RENEW;
+   else if ((errno == EIO) || (errno == EBADF) ||
+            (errno == EPIPE) || (errno == EINVAL) ||
+            (errno == ENOSPC) || (errno == ECONNREFUSED))
+      {
+         if (!thread) kill_server(svr);
+         else ecore_thread_cancel(thread);
+      }
 }
 
 static Eina_Bool
-_ecore_con_svr_udp_handler(void *data, Ecore_Fd_Handler *fd_handler)
+_ecore_con_cl_udp_handler(void *data, Ecore_Fd_Handler *fd_handler)
 {
    Ecore_Con_Server *svr;
-   Ecore_Con_Client *cl = NULL;
 
    svr = data;
    if (svr->dead)
@@ -1709,236 +1784,482 @@
 
    if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ))
      {
-        unsigned char buf[READBUFSIZ];
-        unsigned char client_addr[256];
-        unsigned int client_addr_len = sizeof(client_addr);
-        int num;
+        if (svr->threaded)
+          {
+             svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_cl_udp_cb,
+                                               (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify,
+                                               NULL, (Ecore_Cb)kill_server, svr, EINA_FALSE);
+             if (!svr->recv_thread)
+               goto try_nothread;
+          }
+        else
+          {
+try_nothread:
+             _ecore_con_cl_udp_cb(NULL, svr);
+          }
+     }
+   else if (ecore_main_fd_handler_active_get(fd_handler,
+                                             ECORE_FD_WRITE))
+     {
+        if (svr->threaded)
+          {
+             svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_server_flush,
+                                               NULL,
+                                               (Ecore_Cb)_ecore_con_server_flush_end,
+                                               (Ecore_Cb)kill_server, svr, EINA_FALSE);
+             if (!svr->recv_thread)
+               goto try_nothread_flush;
+          }
+        else
+try_nothread_flush:
+          _ecore_con_server_flush(NULL, svr);
+     }
 
-        errno = 0;
+   return ECORE_CALLBACK_RENEW;
+}
+
+static void
+_ecore_con_svr_notify(Ecore_Thread *thread, Ecore_Con_Thread_Data *data, Ecore_Con_Client *cl)
+{
+   if (data->type == ECORE_CON_EVENT_CLIENT_ADD)
+     ecore_event_add(ECORE_CON_EVENT_CLIENT_ADD,
+                     data->data,
+                     _ecore_con_event_client_add_free,
+                     NULL);
+
+   else if (data->type == ECORE_CON_EVENT_CLIENT_DEL)
+     ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL,
+                     data->data,
+                     _ecore_con_event_client_del_free,
+                     NULL);
+   else 
+     ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA,
+                     data->data,
+                     _ecore_con_event_client_data_free,
+                     NULL);
+
+   free(data);
+}
+
+static void
+_ecore_con_svr_udp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl)
+{
+   unsigned char buf[READBUFSIZ];
+   unsigned char client_addr[256];
+   unsigned int client_addr_len = sizeof(client_addr);
+   int num;
+   Ecore_Con_Server *svr = cl->server;
+
+   errno = 0;
 #ifdef _WIN32
-        num = fcntl(svr->fd, F_SETFL, O_NONBLOCK);
-        if (num >= 0)
-           num =
-              recvfrom(svr->fd, buf, sizeof(buf), 0,
-                       (struct sockaddr *)&client_addr,
-                       &client_addr_len);
+   num = fcntl(svr->fd, F_SETFL, O_NONBLOCK);
+   if (num >= 0)
+      num = recvfrom(svr->fd, buf, sizeof(buf), 0,
+                  (struct sockaddr *)&client_addr,
+                  &client_addr_len);
 
 #else
-        num =
-              recvfrom(svr->fd, buf, sizeof(buf), MSG_DONTWAIT,
-                    (struct sockaddr *)&client_addr,
-                    &client_addr_len);
+   num = recvfrom(svr->fd, buf, sizeof(buf), MSG_DONTWAIT,
+               (struct sockaddr *)&client_addr,
+               &client_addr_len);
 #endif
 
-        if (num > 0)
+   if (num > 0)
+     {
+        if (!svr->delete_me)
           {
-             if (!svr->delete_me)
+             Ecore_Con_Event_Client_Data *e;
+             unsigned char *inbuf;
+
+             /* Create a new client for use in the client data event */
+             cl = calloc(1, sizeof(Ecore_Con_Client));
+             if(cl == NULL)
+                return;
+
+             cl->buf = NULL;
+             cl->fd = 0;
+             cl->fd_handler = NULL;
+             cl->server = svr;
+             cl->client_addr = calloc(1, client_addr_len);
+             cl->client_addr_len = client_addr_len;
+             if(cl->client_addr == NULL)
                {
-                  Ecore_Con_Event_Client_Data *e;
-                  unsigned char *inbuf;
+                  free(cl);
+                  return;
+               }
 
-                  /* Create a new client for use in the client data event */
-                  cl = calloc(1, sizeof(Ecore_Con_Client));
-                  if(cl == NULL)
-                     return ECORE_CALLBACK_RENEW;
+             memcpy(cl->client_addr, &client_addr, client_addr_len);
+             ECORE_MAGIC_SET(cl, ECORE_MAGIC_CON_CLIENT);
+             svr->clients = eina_list_append(svr->clients, cl);
 
-                  cl->buf = NULL;
-                  cl->fd = 0;
-                  cl->fd_handler = NULL;
-                  cl->server = svr;
-                  cl->client_addr = calloc(1, client_addr_len);
-                  cl->client_addr_len = client_addr_len;
-                  if(cl->client_addr == NULL)
-                    {
-                       free(cl);
-                       return ECORE_CALLBACK_RENEW;
-                    }
+             cl->ip = _ecore_con_pretty_ip(cl->client_addr,
+                                           cl->client_addr_len);
 
-                  memcpy(cl->client_addr, &client_addr, client_addr_len);
-                  ECORE_MAGIC_SET(cl, ECORE_MAGIC_CON_CLIENT);
-                  svr->clients = eina_list_append(svr->clients, cl);
+             inbuf = malloc(num);
+             if(inbuf == NULL)
+               {
+                  free(cl->client_addr);
+                  free(cl);
+               }
 
-                  cl->ip = _ecore_con_pretty_ip(cl->client_addr,
-                                                cl->client_addr_len);
+             memcpy(inbuf, buf, num);
 
-                  inbuf = malloc(num);
-                  if(inbuf == NULL)
+             e = calloc(1, sizeof(Ecore_Con_Event_Client_Data));
+             if (e)
+               {
+                  svr->event_count++;
+                  e->client = cl;
+                  e->data = inbuf;
+                  e->size = num;
+                  if (!thread)
+                    ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e,
+                               _ecore_con_event_client_data_free,
+                               NULL);
+                  else
                     {
-                       free(cl->client_addr);
-                       free(cl);
-                       return ECORE_CALLBACK_RENEW;
-                    }
+                       Ecore_Con_Thread_Data *td;
 
-                  memcpy(inbuf, buf, num);
+                       if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
+                         {
+                            WRN("Allocation failure in ecore_con threaded server data!");
+                            return;
+                         }
 
-                  e = calloc(1, sizeof(Ecore_Con_Event_Client_Data));
-                  if (e)
-                    {
-                       svr->event_count++;
-                       e->client = cl;
-                       e->data = inbuf;
-                       e->size = num;
-                            ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e,
-                                       _ecore_con_event_client_data_free,
-                                       NULL);
+                       td->type = ECORE_CON_EVENT_CLIENT_DATA;
+                       td->data = e;
+                       ecore_thread_notify(thread, td);
                     }
+               }
 
-                  if(!cl->delete_me)
+             if(!cl->delete_me)
+               {
+                  Ecore_Con_Event_Client_Add *add;
+
+                  add = calloc(1, sizeof(Ecore_Con_Event_Client_Add));
+                  if(!add)
+                    return;
+
+/*cl->event_count++;*/
+                  add->client = cl;
+                  if (!thread)
+                    ecore_event_add(ECORE_CON_EVENT_CLIENT_ADD,
+                                    add,
+                                    _ecore_con_event_client_add_free,
+                                    NULL);
+                  else
                     {
-                       Ecore_Con_Event_Client_Add *add;
+                       Ecore_Con_Thread_Data *td;
 
-                       add = calloc(1, sizeof(Ecore_Con_Event_Client_Add));
-                       if(add)
+                       if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
                          {
-/*cl->event_count++;*/
-                            add->client = cl;
-                            ecore_event_add(ECORE_CON_EVENT_CLIENT_ADD,
-                                            add,
-                                            _ecore_con_event_client_add_free,
-                                            NULL);
+                            WRN("Allocation failure in ecore_con threaded server data!");
+                            return;
                          }
+
+                       td->type = ECORE_CON_EVENT_CLIENT_ADD;
+                       td->data = e;
+                       ecore_thread_notify(thread, td);
                     }
+
                }
           }
-        else if ((errno == EIO) || (errno == EBADF) ||
-                 (errno == EPIPE) || (errno == EINVAL) ||
-                 (errno == ENOSPC) || (errno == ECONNREFUSED))
+     }
+   else if ((errno == EIO) || (errno == EBADF) ||
+            (errno == EPIPE) || (errno == EINVAL) ||
+            (errno == ENOSPC) || (errno == ECONNREFUSED))
+     {
+        if (!svr->delete_me)
           {
-             if (!svr->delete_me)
+             /* we lost our client! */
+             Ecore_Con_Event_Client_Del *e;
+
+             e = calloc(1, sizeof(Ecore_Con_Event_Client_Del));
+             if (e)
                {
-                  /* we lost our client! */
-                  Ecore_Con_Event_Client_Del *e;
+                  svr->event_count++;
+                  /* be explicit here */
+                  e->client = NULL;
+                  if (!thread)
+                    ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e,
+                               _ecore_con_event_client_del_free,
+                               NULL);
+                  else
+                    {
+                       Ecore_Con_Thread_Data *td;
 
-                  e = calloc(1, sizeof(Ecore_Con_Event_Client_Del));
-                  if (e)
-                    {
-                       svr->event_count++;
-                       /* be explicit here */
-                       e->client = NULL;
-                            ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e,
-                                       _ecore_con_event_client_del_free,
-                                       NULL);
+                       if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
+                         {
+                            WRN("Allocation failure in ecore_con threaded server data!");
+                            return;
+                         }
+
+                       td->type = ECORE_CON_EVENT_CLIENT_DEL;
+                       td->data = e;
+                       ecore_thread_notify(thread, td);
                     }
                }
+          }
 
-             svr->dead = 1;
-             if (svr->fd_handler)
-                ecore_main_fd_handler_del(svr->fd_handler);
+        svr->dead = 1;
+        if (thread)
+          {
+             ecore_thread_cancel(thread);
+             return;
+          }
+        if (svr->fd_handler)
+           ecore_main_fd_handler_del(svr->fd_handler);
 
-             svr->fd_handler = NULL;
-          }
+        svr->fd_handler = NULL;
      }
-   else if (ecore_main_fd_handler_active_get(fd_handler,
-                                             ECORE_FD_WRITE))
-      _ecore_con_client_flush(cl);
-
-   return ECORE_CALLBACK_RENEW;
 }
 
 static Eina_Bool
-_ecore_con_svr_cl_handler(void *data, Ecore_Fd_Handler *fd_handler)
+_ecore_con_svr_udp_handler(void *data, Ecore_Fd_Handler *fd_handler)
 {
-   Ecore_Con_Client *cl;
+   Ecore_Con_Server *svr;
+   Ecore_Con_Client *cl = NULL;
 
-   cl = data;
-   if (cl->dead)
+   svr = data;
+   if (svr->dead)
       return ECORE_CALLBACK_RENEW;
 
-   if (cl->delete_me)
+   if (svr->delete_me)
       return ECORE_CALLBACK_RENEW;
 
+
    if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ))
      {
-        unsigned char *inbuf = NULL;
-        int inbuf_num = 0;
-        int tries;
+        if (svr->threaded)
+          {
+             svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_svr_udp_cb,
+                                               (Ecore_Thread_Notify_Cb)_ecore_con_svr_notify,
+                                               NULL, (Ecore_Cb)kill_server, svr, EINA_FALSE);
+             if (!svr->recv_thread)
+               goto try_nothread;
+          }
+        else
+          {
+try_nothread:
+             _ecore_con_svr_udp_cb(NULL, cl);
+          }
+     }
+   else if (ecore_main_fd_handler_active_get(fd_handler,
+                                             ECORE_FD_WRITE))
+     {
+        if (svr->threaded)
+          {
+             svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_client_flush,
+                                               (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify,
+                                               (Ecore_Cb)_ecore_con_client_flush_end,
+                                               (Ecore_Cb)kill_server, svr, EINA_FALSE);
+             if (!svr->recv_thread)
+               goto try_nothread_flush;
+          }
+        else
+try_nothread_flush:
+          _ecore_con_client_flush(NULL, cl);
+     }
+   return ECORE_CALLBACK_RENEW;
+}
 
-        for (tries = 0; tries < 16; tries++)
+static void
+_ecore_con_svr_cancel(Ecore_Con_Client *cl)
+{
+   cl->dead = 1;
+   if (cl->fd_handler)
+      ecore_main_fd_handler_del(cl->fd_handler);
+
+   cl->fd_handler = NULL;
+}
+
+static void
+_ecore_con_svr_tcp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl)
+{
+   unsigned char *inbuf = NULL;
+   int inbuf_num = 0;
+   int tries;
+   Ecore_Con_Server *svr = cl->server;
+
+   for (tries = 0; tries < 16; tries++)
+     {
+        int num;
+        int lost_client = 1;
+        unsigned char buf[READBUFSIZ];
+
+        errno = 0;
+
+        if (!(cl->server->type & ECORE_CON_SSL))
           {
-             int num;
-             int lost_client = 1;
-             unsigned char buf[READBUFSIZ];
+             if ((num = read(cl->fd, buf, READBUFSIZ)) <= 0)
+                if ((num < 0) && (errno == EAGAIN))
+                   lost_client = 0;
 
-             errno = 0;
+          }
+        else if (!(num =
+                      ecore_con_ssl_client_read(cl, buf,
+                                                READBUFSIZ)))
+           lost_client = 0;
 
-             if (!(cl->server->type & ECORE_CON_SSL))
+        if (num < 1)
+          {
+             if (inbuf && !cl->delete_me)
                {
-                  if ((num = read(cl->fd, buf, READBUFSIZ)) <= 0)
-                     if ((num < 0) && (errno == EAGAIN))
-                        lost_client = 0;
+                  Ecore_Con_Event_Client_Data *e;
 
+                  e = calloc(1, sizeof(Ecore_Con_Event_Client_Data));
+                  if (e)
+                    {
+                       cl->event_count++;
+                       e->client = cl;
+                       e->data = inbuf;
+                       e->size = inbuf_num;
+                       if (!thread)
+                         ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e,
+                                    _ecore_con_event_client_data_free,
+                                    NULL);
+                       else
+                         {
+                            Ecore_Con_Thread_Data *td;
+
+                            if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
+                              {
+                                 WRN("Allocation failure in ecore_con threaded server data!");
+                                 return;
+                              }
+
+                            td->type = ECORE_CON_EVENT_CLIENT_DATA;
+                            td->data = e;
+                            ecore_thread_notify(thread, td);
+                         }
+                    }
                }
-             else if (!(num =
-                           ecore_con_ssl_client_read(cl, buf,
-                                                     READBUFSIZ)))
-                lost_client = 0;
 
-             if (num < 1)
+             if (lost_client)
                {
-                  if (inbuf && !cl->delete_me)
+                  if (!cl->delete_me)
                     {
-                       Ecore_Con_Event_Client_Data *e;
+                       /* we lost our client! */
+                       Ecore_Con_Event_Client_Del *e;
 
-                       e = calloc(1, sizeof(Ecore_Con_Event_Client_Data));
+                       e = calloc(1, sizeof(Ecore_Con_Event_Client_Del));
                        if (e)
                          {
                             cl->event_count++;
                             e->client = cl;
-                            e->data = inbuf;
-                            e->size = inbuf_num;
-                                 ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e,
-                                            _ecore_con_event_client_data_free,
-                                            NULL);
-                         }
-                    }
+                            if (!thread)
+                              ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL,
+                                 e,
+                                 _ecore_con_event_client_del_free,
+                                 NULL);
+                            else
+                              {
+                                 Ecore_Con_Thread_Data *td;
 
-                  if (lost_client)
-                    {
-                       if (!cl->delete_me)
-                         {
-                            /* we lost our client! */
-                            Ecore_Con_Event_Client_Del *e;
+                                 if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
+                                   {
+                                      WRN("Allocation failure in ecore_con threaded server data!");
+                                      return;
+                                   }
 
-                            e = calloc(1, sizeof(Ecore_Con_Event_Client_Del));
-                            if (e)
-                              {
-                                 cl->event_count++;
-                                 e->client = cl;
-                                 ecore_event_add(
-                                    ECORE_CON_EVENT_CLIENT_DEL,
-                                    e,
-                                    _ecore_con_event_client_del_free,
-                                    NULL);
+                                 td->type = ECORE_CON_EVENT_CLIENT_DEL;
+                                 td->data = e;
+                                 ecore_thread_notify(thread, td);
                               }
                          }
+                    }
 
+                  if (thread)
+                       ecore_thread_cancel(thread);
+                  else
+                    {                       
                        cl->dead = 1;
                        if (cl->fd_handler)
                           ecore_main_fd_handler_del(cl->fd_handler);
 
                        cl->fd_handler = NULL;
                     }
+               }
 
-                  break;
-               }
-             else
-               {
-                  inbuf = realloc(inbuf, inbuf_num + num);
-                  memcpy(inbuf + inbuf_num, buf, num);
-                  inbuf_num += num;
-               }
+             break;
           }
+        else
+          {
+             inbuf = realloc(inbuf, inbuf_num + num);
+             memcpy(inbuf + inbuf_num, buf, num);
+             inbuf_num += num;
+          }
      }
+}
+
+static Eina_Bool
+_ecore_con_svr_cl_handler(void *data, Ecore_Fd_Handler *fd_handler)
+{
+   Ecore_Con_Client *cl;
+
+   cl = data;
+   if (cl->dead)
+      return ECORE_CALLBACK_RENEW;
+
+   if (cl->delete_me)
+      return ECORE_CALLBACK_RENEW;
+
+   if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ))
+     {
+        if (cl->server->threaded)
+          {
+             cl->server->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_svr_tcp_cb,
+                                               (Ecore_Thread_Notify_Cb)_ecore_con_svr_notify,
+                                               NULL,
+                                               (Ecore_Cb)_ecore_con_svr_cancel,
+                                               cl, EINA_FALSE);
+             if (!cl->server->recv_thread)
+               goto try_nothread;
+          }
+        else
+          {
+try_nothread:
+             _ecore_con_svr_udp_cb(NULL, cl);
+          }
+     }
    else if (ecore_main_fd_handler_active_get(fd_handler,
                                              ECORE_FD_WRITE))
-      _ecore_con_client_flush(cl);
+     {
+        if (cl->server->threaded)
+          {
+             cl->server->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_client_flush,
+                                               (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify,
+                                               (Ecore_Cb)_ecore_con_client_flush_end,
+                                               (Ecore_Cb)_ecore_con_svr_cancel, cl,
+                                               EINA_FALSE);
+             if (!cl->server->recv_thread)
+               goto try_nothread_flush;
+          }
+        else
+try_nothread_flush:
+          _ecore_con_client_flush(NULL, cl);
+     }
 
    return ECORE_CALLBACK_RENEW;
 }
 
 static void
-_ecore_con_server_flush(Ecore_Con_Server *svr)
+_ecore_con_server_flush_end(Ecore_Con_Server *svr)
 {
+   if (svr->write_buf_offset < svr->write_buf_size)
+     return;
+
+   svr->write_buf_size = 0;
+   svr->write_buf_offset = 0;
+   free(svr->write_buf);
+   svr->write_buf = NULL;
+   if (svr->fd_handler)
+      ecore_main_fd_handler_active_set(svr->fd_handler,
+                                       ECORE_FD_READ);
+}
+
+static void
+_ecore_con_server_flush(Ecore_Thread *thread, Ecore_Con_Server *svr)
+{
    int count, num;
 
    if (!svr->write_buf)
@@ -1963,11 +2284,18 @@
    if (count < 1)
      {
         /* we lost our server! */
-        kill_server(svr);
+        if (!thread)
+          return kill_server(svr);
+
+        ecore_thread_cancel(thread);
         return;
      }
 
    svr->write_buf_offset += count;
+
+   if (thread)
+     return;
+
    if (svr->write_buf_offset >= svr->write_buf_size)
      {
         svr->write_buf_size = 0;
@@ -1981,8 +2309,22 @@
 }
 
 static void
-_ecore_con_client_flush(Ecore_Con_Client *cl)
+_ecore_con_client_flush_end(Ecore_Con_Client *cl)
 {
+   if (cl->buf_offset < cl->buf_size)
+     return;
+
+   cl->buf_size = 0;
+   cl->buf_offset = 0;
+   free(cl->buf);
+   cl->buf = NULL;
+   if (cl->fd_handler)
+      ecore_main_fd_handler_active_set(cl->fd_handler, ECORE_FD_READ);
+}
+
+static void
+_ecore_con_client_flush(Ecore_Thread *thread, Ecore_Con_Client *cl)
+{
    int count, num;
 
    if (!cl->buf)
@@ -2009,11 +2351,32 @@
                   {
                      cl->event_count++;
                      e->client = cl;
-                     ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e,
-                                     _ecore_con_event_client_del_free, NULL);
+                     if (!thread)
+                       ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e,
+                                       _ecore_con_event_client_del_free, NULL);
+                       else
+                         {
+                            Ecore_Con_Thread_Data *td;
+
+                            if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
+                              {
+                                 WRN("Allocation failure in ecore_con threaded server data!");
+                                 return;
+                              }
+
+                            td->type = ECORE_CON_EVENT_CLIENT_DEL;
+                            td->data = e;
+                            ecore_thread_notify(thread, td);
+                         }
                   }
 
                 cl->dead = 1;
+                if (thread)
+                  {
+                     ecore_thread_cancel(thread);
+                     return;
+                  }
+
                 if (cl->fd_handler)
                    ecore_main_fd_handler_del(cl->fd_handler);
 
@@ -2024,6 +2387,8 @@
      }
 
    cl->buf_offset += count;
+   if (thread)
+     return;
    if (cl->buf_offset >= cl->buf_size)
      {
         cl->buf_size = 0;
------------------------------------------------------------------------------
This SF.net email is sponsored by 

Make an app they can't live without
Enter the BlackBerry Developer Challenge
http://p.sf.net/sfu/RIM-dev2dev 
_______________________________________________
enlightenment-devel mailing list
enlightenment-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/enlightenment-devel

Reply via email to