barbieri pushed a commit to branch master.

http://git.enlightenment.org/core/efl.git/commit/?id=6f3220ffc6140594792c596ebb452149e19be00f

commit 6f3220ffc6140594792c596ebb452149e19be00f
Author: Gustavo Sverzut Barbieri <barbi...@profusion.mobi>
Date:   Wed Nov 23 19:45:33 2016 -0200

    ecore_ipc: convert ecore_ipc_server_add() to Efl_Net_Server.
    
    Each client (Ecore_Ipc_Client) is very similar to the handle
    configured by ecore_ipc_server_connect() (the dialer), except we do
    not have events such as "connected" and "error", as well as we don't
    delete the socket as it's owned by the server, instead we close it.
    
    The UNIX socket is configured similarly to ecore_con, setting the same
    masks and mode for directories.
---
 src/lib/ecore_ipc/ecore_ipc.c         | 446 +++++++++++++++++++++++++++++++++-
 src/lib/ecore_ipc/ecore_ipc_private.h |  10 +
 2 files changed, 450 insertions(+), 6 deletions(-)

diff --git a/src/lib/ecore_ipc/ecore_ipc.c b/src/lib/ecore_ipc/ecore_ipc.c
index 8a38815..82d5819 100644
--- a/src/lib/ecore_ipc/ecore_ipc.c
+++ b/src/lib/ecore_ipc/ecore_ipc.c
@@ -295,6 +295,8 @@ ecore_ipc_client_add(Ecore_Ipc_Server *svr)
    cl = calloc(1, sizeof(Ecore_Ipc_Client));
    EINA_SAFETY_ON_NULL_RETURN_VAL(cl, NULL);
    cl->svr = svr;
+   cl->max_buf_size = 32 * 1024;
+
    ECORE_MAGIC_SET(cl, ECORE_MAGIC_IPC_CLIENT);
    svr->clients = eina_list_append(svr->clients, cl);
 
@@ -362,9 +364,27 @@ ecore_ipc_shutdown(void)
    return _ecore_ipc_init_count;
 }
 
-/* FIXME: need to add protocol type parameter */
-EAPI Ecore_Ipc_Server *
-ecore_ipc_server_add(Ecore_Ipc_Type compl_type, const char *name, int port, 
const void *data)
+static void
+_ecore_ipc_server_del(Ecore_Ipc_Server *svr)
+{
+   DBG("server %p del", svr);
+
+   if (svr->server)
+     {
+        efl_del(svr->server);
+        svr->server = NULL;
+     }
+}
+
+static void _ecore_ipc_server_client_add(void *data, const Efl_Event *event);
+
+EFL_CALLBACKS_ARRAY_DEFINE(_ecore_ipc_server_cbs,
+                           { EFL_NET_SERVER_EVENT_CLIENT_ADD, 
_ecore_ipc_server_client_add });
+
+
+#ifndef EFL_NET_SERVER_UNIX_CLASS
+static Ecore_Ipc_Server *
+ecore_ipc_server_add_legacy(Ecore_Ipc_Type compl_type, char *name, int port, 
const void *data)
 {
    Ecore_Ipc_Server *svr;
    Ecore_Ipc_Type type;
@@ -403,6 +423,190 @@ ecore_ipc_server_add(Ecore_Ipc_Type compl_type, const 
char *name, int port, cons
    ECORE_MAGIC_SET(svr, ECORE_MAGIC_IPC_SERVER);
    return svr;
 }
+#else
+static Eina_Bool
+_ecore_ipc_local_mkpath(const char *path, mode_t mode)
+{
+   Eina_Bool ret = EINA_FALSE;
+   char *s, *d, *itr;
+
+   if (!path) return EINA_FALSE;
+   EINA_SAFETY_ON_TRUE_RETURN_VAL(path[0] != '/', EINA_FALSE);
+
+   s = strdup(path);
+   EINA_SAFETY_ON_NULL_RETURN_VAL(s, EINA_FALSE);
+   d = dirname(s);
+   EINA_SAFETY_ON_NULL_RETURN_VAL(d, EINA_FALSE);
+
+   for (itr = d + 1; *itr != '\0'; itr++)
+     {
+        if (*itr == '/')
+          {
+             *itr = '\0';
+             if (mkdir(d, mode) != 0)
+               {
+                  if (errno != EEXIST)
+                    {
+                       ERR("could not create parent directory '%s' of path 
'%s': %s", d, path, strerror(errno));
+                       goto end;
+                    }
+               }
+             *itr = '/';
+          }
+     }
+
+   if (mkdir(d, mode) != 0)
+     {
+        if (errno != EEXIST)
+          ERR("could not create parent directory '%s' of path '%s': %s", d, 
path, strerror(errno));
+        else
+          {
+             struct stat st;
+             if ((stat(d, &st) != 0) || (!S_ISDIR(st.st_mode)))
+               ERR("could not create parent directory '%s' of path '%s': 
exists but is not a directory", d, path);
+             else ret = EINA_TRUE;
+          }
+     }
+   else ret = EINA_TRUE;
+
+ end:
+   free(s);
+   return ret;
+}
+#endif
+
+/* FIXME: need to add protocol type parameter */
+EAPI Ecore_Ipc_Server *
+ecore_ipc_server_add(Ecore_Ipc_Type type, const char *name, int port, const 
void *data)
+{
+   Ecore_Ipc_Server *svr;
+   Eo *loop = ecore_main_loop_get();
+   char *address = NULL;
+   Eina_Error err;
+#ifdef EFL_NET_SERVER_UNIX_CLASS
+   mode_t old_mask, new_mask = 0;
+#endif
+
+   EINA_SAFETY_ON_NULL_RETURN_VAL(name, NULL);
+
+#ifndef EFL_NET_SERVER_UNIX_CLASS
+   if (((type & ECORE_IPC_TYPE) == ECORE_IPC_LOCAL_USER) ||
+       ((type & ECORE_IPC_TYPE) == ECORE_IPC_LOCAL_SYSTEM))
+     {
+        DBG("no 'local' Efl.Net.Server for your system  yet, use legacy 
Ecore_Con");
+        return ecore_ipc_server_add_legacy(type, name, port, data);
+     }
+#endif
+
+   svr = calloc(1, sizeof(Ecore_Ipc_Server));
+   EINA_SAFETY_ON_NULL_RETURN_VAL(svr, NULL);
+
+   if (0) { }
+#ifdef EFL_NET_SERVER_UNIX_CLASS
+   if ((type & ECORE_IPC_TYPE) == ECORE_IPC_LOCAL_USER)
+     {
+        address = ecore_con_local_path_new(EINA_FALSE, name, port);
+        EINA_SAFETY_ON_NULL_GOTO(address, error_server);
+
+        if (!_ecore_ipc_local_mkpath(address, S_IRUSR | S_IWUSR | S_IXUSR))
+          {
+             free(address);
+             goto error_server;
+          }
+
+        new_mask = S_IRGRP | S_IWGRP | S_IXGRP | S_IROTH | S_IWOTH | S_IXOTH;
+
+        svr->server = efl_add(EFL_NET_SERVER_UNIX_CLASS, 
ecore_main_loop_get());
+        EINA_SAFETY_ON_NULL_GOTO(svr->server, error_server);
+     }
+   else if ((type & ECORE_IPC_TYPE) == ECORE_IPC_LOCAL_SYSTEM)
+     {
+        address = ecore_con_local_path_new(EINA_TRUE, name, port);
+        EINA_SAFETY_ON_NULL_GOTO(address, error_server);
+
+        /* ecore_con didn't create leading directories for LOCAL_SYSTEM */
+
+        new_mask = 0;
+
+        svr->server = efl_add(EFL_NET_SERVER_UNIX_CLASS, 
ecore_main_loop_get());
+        EINA_SAFETY_ON_NULL_GOTO(svr->server, error_server);
+     }
+#endif /* EFL_NET_SERVER_UNIX_CLASS */
+   else if ((type & ECORE_IPC_TYPE) == ECORE_IPC_REMOTE_SYSTEM)
+     {
+        char buf[4096];
+
+        if (port <= 0)
+          {
+             ERR("remote system requires port>=0, got %d", port);
+             goto error_server;
+          }
+
+        snprintf(buf, sizeof(buf), "%s:%d", name, port);
+        address = strdup(buf);
+        EINA_SAFETY_ON_NULL_GOTO(address, error_server);
+
+        if ((type & ECORE_IPC_USE_SSL) == ECORE_IPC_USE_SSL)
+          {
+             svr->server = efl_add(EFL_NET_SERVER_SSL_CLASS, loop);
+             EINA_SAFETY_ON_NULL_GOTO(svr->server, error_server);
+          }
+        else
+          {
+             svr->server = efl_add(EFL_NET_SERVER_TCP_CLASS, loop);
+             EINA_SAFETY_ON_NULL_GOTO(svr->server, error_server);
+          }
+     }
+   else
+     {
+        ERR("IPC Type must be one of: local_user, local_system or 
remote_system");
+        goto error_server;
+     }
+
+   efl_event_callback_array_add(svr->server, _ecore_ipc_server_cbs(), svr);
+
+#ifdef EFL_NET_SERVER_UNIX_CLASS
+   if (efl_isa(svr->server, EFL_NET_SERVER_UNIX_CLASS))
+     old_mask = umask(new_mask);
+#endif
+
+   err = efl_net_server_serve(svr->server, address);
+
+#ifdef EFL_NET_SERVER_UNIX_CLASS
+   if (efl_isa(svr->server, EFL_NET_SERVER_UNIX_CLASS))
+     umask(old_mask);
+#endif
+
+   if (err)
+     {
+        WRN("Could not serve %s %s: %s",
+            efl_class_name_get(efl_class_get(svr->server)),
+            address, eina_error_msg_get(err));
+        goto error;
+     }
+   DBG("will serve %p %s address='%s'",
+       svr->server,
+       efl_class_name_get(efl_class_get(svr->server)),
+       address);
+
+   svr->max_buf_size = 32 * 1024;
+   svr->data = (void *)data;
+   servers = eina_list_append(servers, svr);
+   ECORE_MAGIC_SET(svr, ECORE_MAGIC_IPC_SERVER);
+   free(address);
+   return svr;
+
+ error:
+   free(address);
+   _ecore_ipc_server_del(svr);
+   free(svr);
+   return NULL; /* server will trigger all cleanup on its own callbacks */
+
+ error_server:
+   free(address);
+   free(svr);
+   return NULL;
+}
 
 static void
 _ecore_ipc_dialer_del(Ecore_Ipc_Server *svr)
@@ -713,6 +917,7 @@ ecore_ipc_server_del(Ecore_Ipc_Server *svr)
           }
 
         if (svr->dialer.dialer) _ecore_ipc_dialer_del(svr);
+        if (svr->server) _ecore_ipc_server_del(svr);
         if (svr->legacy_server) ecore_con_server_del(svr->legacy_server);
         servers = eina_list_remove(servers, svr);
 
@@ -749,6 +954,7 @@ ecore_ipc_server_connected_get(Ecore_Ipc_Server *svr)
 
    if (svr->dialer.dialer)
      return efl_net_dialer_connected_get(svr->dialer.dialer);
+   else if (svr->server) return EINA_TRUE;
    else if (!svr->legacy_server) return EINA_FALSE;
 
    return ecore_con_server_connected_get(svr->legacy_server);
@@ -876,6 +1082,11 @@ ecore_ipc_server_send(Ecore_Ipc_Server *svr, int major, 
int minor, int ref, int
 
         return s + size;
      }
+   else if (svr->server)
+     {
+        ERR("Send data to clients, not the server handle");
+        return 0;
+     }
    else if (!svr->legacy_server) return 0;
 
    ret = ecore_con_server_send(svr->legacy_server, dat, s);
@@ -892,6 +1103,12 @@ ecore_ipc_server_client_limit_set(Ecore_Ipc_Server *svr, 
int client_limit, char
                          "ecore_ipc_server_client_limit_set");
         return;
      }
+   if (svr->server)
+     {
+        efl_net_server_clients_limit_set(svr->server, client_limit, 
reject_excess_clients);
+        return;
+     }
+   else if (!svr->legacy_server) return;
    ecore_con_server_client_limit_set(svr->legacy_server, client_limit, 
reject_excess_clients);
 }
 
@@ -937,6 +1154,14 @@ ecore_ipc_server_ip_get(Ecore_Ipc_Server *svr)
         /* original IPC just returned IP for remote connections */
         return NULL;
      }
+   else if (svr->server)
+     {
+        if (efl_isa(svr->server, EFL_NET_SERVER_TCP_CLASS) ||
+            efl_isa(svr->server, EFL_NET_SERVER_SSL_CLASS))
+          return efl_net_server_address_get(svr->server);
+        /* original IPC just returned IP for remote connections */
+        return NULL;
+     }
    else if (!svr->legacy_server) return NULL;
 
    return ecore_con_server_ip_get(svr->legacy_server);
@@ -957,6 +1182,11 @@ ecore_ipc_server_flush(Ecore_Ipc_Server *svr)
           efl_io_copier_flush(svr->dialer.send_copier);
         return;
      }
+   else if (svr->server)
+     {
+        ERR("Flush clients, not the server handle");
+        return;
+     }
    else if (!svr->legacy_server) return;
 
    ecore_con_server_flush(svr->legacy_server);
@@ -1009,8 +1239,16 @@ ecore_ipc_client_send(Ecore_Ipc_Client *cl, int major, 
int minor, int ref, int r
                          "ecore_ipc_client_send");
         return 0;
      }
-   EINA_SAFETY_ON_TRUE_RETURN_VAL(!cl->client, 0);
-   EINA_SAFETY_ON_TRUE_RETURN_VAL(!ecore_con_client_connected_get(cl->client), 
0);
+   if (cl->socket.socket)
+     
EINA_SAFETY_ON_TRUE_RETURN_VAL(efl_io_closer_closed_get(cl->socket.socket), 0);
+   else if (cl->client)
+     
EINA_SAFETY_ON_TRUE_RETURN_VAL(!ecore_con_client_connected_get(cl->client), 0);
+   else
+     {
+        ERR("client %p is not connected", cl);
+        return 0;
+     }
+
    if (size < 0) size = 0;
    msg.major    = major;
    msg.minor    = minor;
@@ -1034,6 +1272,48 @@ ecore_ipc_client_send(Ecore_Ipc_Client *cl, int major, 
int minor, int ref, int r
    *head |= md << (4 * 5);
    *head = htonl(*head);
    cl->prev.o = msg;
+
+   if (cl->socket.input)
+     {
+        Eina_Slice slice;
+        Eina_Error err;
+
+        slice.mem = dat;
+        slice.len = s;
+        err = efl_io_writer_write(cl->socket.input, &slice, NULL);
+        if (err)
+          {
+             ERR("could not write queue=%p %zd bytes: %s",
+                 cl->socket.input, slice.len, eina_error_msg_get(err));
+             return 0;
+          }
+        if (slice.len < (size_t)s)
+          {
+             ERR("only wrote %zd of %d bytes to queue %p",
+                 slice.len, s, cl->socket.input);
+             return 0;
+          }
+
+        slice.mem = data;
+        slice.len = size;
+        err = efl_io_writer_write(cl->socket.input, &slice, NULL);
+        if (err)
+          {
+             ERR("could not write queue=%p %zd bytes: %s",
+                 cl->socket.input, slice.len, eina_error_msg_get(err));
+             return 0;
+          }
+        if (slice.len < (size_t)size)
+          {
+             ERR("only wrote %zd of %d bytes to queue %p",
+                 slice.len, size, cl->socket.input);
+             return 0;
+          }
+
+        return s + size;
+     }
+   else if (!cl->client) return 0;
+
    ret = ecore_con_client_send(cl->client, dat, s);
    if (size > 0) ret += ecore_con_client_send(cl->client, data, size);
    return ret;
@@ -1051,6 +1331,140 @@ ecore_ipc_client_server_get(Ecore_Ipc_Client *cl)
    return cl->svr;
 }
 
+static Efl_Callback_Array_Item *_ecore_ipc_socket_cbs(void);
+
+static void
+_ecore_ipc_client_socket_del(Ecore_Ipc_Client *cl)
+{
+   DBG("client %p socket del", cl);
+
+   if (cl->socket.recv_copier)
+     {
+        efl_del(cl->socket.recv_copier);
+        cl->socket.recv_copier = NULL;
+     }
+
+   if (cl->socket.send_copier)
+     {
+        efl_del(cl->socket.send_copier);
+        cl->socket.send_copier = NULL;
+     }
+
+   if (cl->socket.input)
+     {
+        efl_del(cl->socket.input);
+        cl->socket.input = NULL;
+     }
+
+   if (cl->socket.socket)
+     {
+        efl_event_callback_array_del(cl->socket.socket, 
_ecore_ipc_socket_cbs(), cl);
+        /* do not del() as it's owned by srv->server */
+        if (!efl_io_closer_closed_get(cl->socket.socket))
+          efl_io_closer_close(cl->socket.socket);
+        efl_unref(cl->socket.socket);
+        cl->socket.socket = NULL;
+     }
+}
+
+static void
+_ecore_ipc_client_socket_eos(void *data, const Efl_Event *event EINA_UNUSED)
+{
+   Ecore_Ipc_Client *cl = data;
+
+   DBG("client %p socket %p eos", cl, cl->socket.socket);
+
+   _ecore_ipc_client_socket_del(cl);
+
+   ecore_ipc_post_event_client_del(cl);
+}
+
+EFL_CALLBACKS_ARRAY_DEFINE(_ecore_ipc_socket_cbs,
+                           { EFL_IO_READER_EVENT_EOS, 
_ecore_ipc_client_socket_eos });
+
+static Eina_Bool ecore_ipc_client_data_process(Ecore_Ipc_Client *cl, void 
*data, int size, Eina_Bool *stolen);
+
+static void
+_ecore_ipc_client_socket_copier_data(void *data, const Efl_Event *event 
EINA_UNUSED)
+{
+   Ecore_Ipc_Client *cl = data;
+   Eina_Binbuf *binbuf;
+   uint8_t *mem;
+   int size;
+   Eina_Bool stolen;
+
+   DBG("client %p recv_copier %p data", cl, cl->socket.recv_copier);
+
+   binbuf = efl_io_copier_binbuf_steal(cl->socket.recv_copier);
+   EINA_SAFETY_ON_NULL_RETURN(binbuf);
+   size = eina_binbuf_length_get(binbuf);
+   mem = eina_binbuf_string_steal(binbuf);
+   eina_binbuf_free(binbuf);
+
+   ecore_ipc_client_data_process(cl, mem, size, &stolen);
+   if (!stolen) free(mem);
+}
+
+static void
+_ecore_ipc_client_socket_copier_error(void *data, const Efl_Event *event)
+{
+   Ecore_Ipc_Client *cl = data;
+   Eina_Error *perr = event->info;
+
+   WRN("client %p socket %p copier %p error %s", cl, cl->socket.socket, 
event->object, eina_error_msg_get(*perr));
+
+   if (!efl_io_closer_closed_get(cl->socket.socket))
+     efl_io_closer_close(cl->socket.socket);
+}
+
+EFL_CALLBACKS_ARRAY_DEFINE(_ecore_ipc_client_socket_copier_cbs,
+                           { EFL_IO_COPIER_EVENT_ERROR, 
_ecore_ipc_client_socket_copier_error });
+
+static void
+_ecore_ipc_server_client_add(void *data, const Efl_Event *event)
+{
+   Ecore_Ipc_Server *svr = data;
+   Eo *socket = event->info;
+   Ecore_Ipc_Client *cl;
+   Eo *loop;
+
+   DBG("server %p %p got new client %p (%s)",
+       svr, svr->server,
+       event->object, efl_net_socket_address_remote_get(socket));
+
+   cl = ecore_ipc_client_add(svr);
+   EINA_SAFETY_ON_NULL_RETURN(cl);
+
+   cl->socket.socket = efl_ref(socket);
+   efl_event_callback_array_add(cl->socket.socket, _ecore_ipc_socket_cbs(), 
cl);
+
+   loop = efl_loop_get(socket);
+
+   cl->socket.input = efl_add(EFL_IO_QUEUE_CLASS, loop);
+   EINA_SAFETY_ON_NULL_GOTO(cl->socket.input, error);
+
+   cl->socket.send_copier = efl_add(EFL_IO_COPIER_CLASS, loop,
+                                     
efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE),
+                                     efl_io_copier_source_set(efl_added, 
cl->socket.input),
+                                     efl_io_copier_destination_set(efl_added, 
cl->socket.socket),
+                                     efl_event_callback_array_add(efl_added, 
_ecore_ipc_client_socket_copier_cbs(), cl));
+   EINA_SAFETY_ON_NULL_GOTO(cl->socket.send_copier, error);
+
+   cl->socket.recv_copier = efl_add(EFL_IO_COPIER_CLASS, loop,
+                                     
efl_io_closer_close_on_destructor_set(efl_added, EINA_FALSE),
+                                     efl_io_copier_source_set(efl_added, 
cl->socket.socket),
+                                     efl_event_callback_array_add(efl_added, 
_ecore_ipc_client_socket_copier_cbs(), cl),
+                                     efl_event_callback_add(efl_added, 
EFL_IO_COPIER_EVENT_DATA, _ecore_ipc_client_socket_copier_data, cl));
+   EINA_SAFETY_ON_NULL_GOTO(cl->socket.recv_copier, error);
+
+   ecore_ipc_post_event_client_add(cl);
+   return;
+
+ error:
+   _ecore_ipc_client_socket_del(cl);
+   free(cl);
+}
+
 EAPI void *
 ecore_ipc_client_del(Ecore_Ipc_Client *cl)
 {
@@ -1070,6 +1484,7 @@ ecore_ipc_client_del(Ecore_Ipc_Client *cl)
    if (cl->event_count == 0)
      {
         svr = cl->svr;
+        if (cl->socket.socket) _ecore_ipc_client_socket_del(cl);
         if (cl->client) ecore_con_client_del(cl->client);
         if (ECORE_MAGIC_CHECK(svr, ECORE_MAGIC_IPC_SERVER))
           svr->clients = eina_list_remove(svr->clients, cl);
@@ -1137,6 +1552,18 @@ ecore_ipc_client_ip_get(Ecore_Ipc_Client *cl)
                          "ecore_ipc_client_ip_get");
         return NULL;
      }
+   if (cl->socket.socket)
+     {
+        if (efl_isa(cl->socket.socket, EFL_NET_SOCKET_TCP_CLASS) ||
+            efl_isa(cl->socket.socket, EFL_NET_SOCKET_SSL_CLASS))
+          return efl_net_socket_address_remote_get(cl->socket.socket);
+        /* original IPC just returned IP for remote connections,
+         * for unix socket it returned 0.0.0.0
+         */
+        return "0.0.0.0";
+     }
+   else if (!cl->client) return NULL;
+
    return ecore_con_client_ip_get(cl->client);
 }
 
@@ -1149,6 +1576,14 @@ ecore_ipc_client_flush(Ecore_Ipc_Client *cl)
                          "ecore_ipc_client_flush");
         return;
      }
+   if (cl->socket.input)
+     {
+        while (efl_io_queue_usage_get(cl->socket.input) > 0)
+          efl_io_copier_flush(cl->socket.send_copier);
+        return;
+     }
+   else if (!cl->client) return;
+
    ecore_con_client_flush(cl->client);
 }
 
@@ -1174,7 +1609,6 @@ _ecore_ipc_event_client_add(void *data EINA_UNUSED, int 
ev_type EINA_UNUSED, voi
 
         if (!cl) return ECORE_CALLBACK_CANCEL;
         cl->client = e->client;
-        cl->max_buf_size = 32 * 1024;
         ecore_con_client_data_set(cl->client, (void *)cl);
 
         ecore_ipc_post_event_client_add(cl);
diff --git a/src/lib/ecore_ipc/ecore_ipc_private.h 
b/src/lib/ecore_ipc/ecore_ipc_private.h
index 9f0def1..baf71fd 100644
--- a/src/lib/ecore_ipc/ecore_ipc_private.h
+++ b/src/lib/ecore_ipc/ecore_ipc_private.h
@@ -66,6 +66,14 @@ struct _Ecore_Ipc_Msg_Head
 struct _Ecore_Ipc_Client
 {
    ECORE_MAGIC;
+
+   struct {
+      Eo *input;
+      Eo *socket;
+      Eo *recv_copier;
+      Eo *send_copier;
+   } socket;
+
    Ecore_Con_Client  *client;
    Ecore_Ipc_Server  *svr;
    void              *data;
@@ -93,6 +101,8 @@ struct _Ecore_Ipc_Server
       Eo *send_copier;
    } dialer;
 
+   Eo *server;
+
    Ecore_Con_Server *legacy_server;
    Eina_List        *clients;
    void              *data;

-- 


Reply via email to