Hey,

as i have succedded in integrating the gstreamer thread in the ecore main loop on Windows, and as some of you talked about a pipe wrapper, I've quickly written a wrapper around pipe(), read() and write().

I have attached the source file to put in ecore/src/lib/ecore/ and a diff.

There is a complete example that uses that wrapper with gstreamer (the test I used on Windows, btw) and the documentation is here too.

That can, maybe, help people to integrate other threads or loops (like the threaded glib loop) in the ecore main loop.

feel free to comment :)

Vincent
? ecore.patch
? ecore_pipe.diff
? src/lib/ecore/ecore_pipe.c
Index: configure.in
===================================================================
RCS file: /cvs/e/e17/libs/ecore/configure.in,v
retrieving revision 1.247
diff -u -r1.247 configure.in
--- configure.in        25 Jan 2008 03:35:46 -0000      1.247
+++ configure.in        25 Jan 2008 19:39:22 -0000
@@ -730,7 +730,7 @@
 ECORE_CHECK_MODULE([IMF], [yes])
 
 try_ecore_imf_evas=no
-if test "x$have_ecore_imf" = xyes -a "x$have_evas" = "xyes"; then
+if test "x$have_ecore_imf" = "xyes" -a "x$have_evas" = "xyes"; then
   try_ecore_imf_evas=yes
 fi
 
Index: src/lib/ecore/Ecore.h
===================================================================
RCS file: /cvs/e/e17/libs/ecore/src/lib/ecore/Ecore.h,v
retrieving revision 1.58
diff -u -r1.58 Ecore.h
--- src/lib/ecore/Ecore.h       25 Jan 2008 18:28:16 -0000      1.58
+++ src/lib/ecore/Ecore.h       25 Jan 2008 19:39:22 -0000
@@ -118,6 +118,7 @@
    typedef void Ecore_Event; /**< A handle for an event */
    typedef void Ecore_Animator; /**< A handle for animators */
    typedef void Ecore_Poller; /**< A handle for pollers */
+   typedef void Ecore_Pipe; /**< A handle for pipes */
 #endif
    typedef struct _Ecore_Event_Signal_User     Ecore_Event_Signal_User; /**< 
User signal event */
    typedef struct _Ecore_Event_Signal_Hup      Ecore_Event_Signal_Hup; /**< 
Hup signal event */
@@ -290,6 +291,11 @@
    EAPI double        ecore_poller_poll_interval_get(Ecore_Poller_Type type);
    EAPI Ecore_Poller *ecore_poller_add(Ecore_Poller_Type type, int interval, 
int (*func) (void *data), const void *data);
    EAPI void         *ecore_poller_del(Ecore_Poller *poller);
+
+   EAPI Ecore_Pipe *ecore_pipe_new (void (*handler) (void *data));
+   EAPI void ecore_pipe_free(Ecore_Pipe *pipe);
+   EAPI void ecore_pipe_write(Ecore_Pipe *pipe,
+                              void       *data);
 
    
 #ifdef __cplusplus
Index: src/lib/ecore/Makefile.am
===================================================================
RCS file: /cvs/e/e17/libs/ecore/src/lib/ecore/Makefile.am,v
retrieving revision 1.14
diff -u -r1.14 Makefile.am
--- src/lib/ecore/Makefile.am   11 Jan 2008 07:33:56 -0000      1.14
+++ src/lib/ecore/Makefile.am   25 Jan 2008 19:39:22 -0000
@@ -32,6 +32,7 @@
 ecore_tree.c \
 ecore_value.c \
 ecore_poll.c \
+ecore_pipe.c \
 ecore_private.h
 
 libecore_la_LIBADD = @dlopen_libs@ @winsock_libs@ -lm
Index: src/lib/ecore/ecore_private.h
===================================================================
RCS file: /cvs/e/e17/libs/ecore/src/lib/ecore/ecore_private.h,v
retrieving revision 1.46
diff -u -r1.46 ecore_private.h
--- src/lib/ecore/ecore_private.h       11 Jan 2008 07:33:56 -0000      1.46
+++ src/lib/ecore/ecore_private.h       25 Jan 2008 19:39:23 -0000
@@ -241,6 +241,7 @@
 typedef struct _Ecore_Event         Ecore_Event;
 typedef struct _Ecore_Animator      Ecore_Animator;
 typedef struct _Ecore_Poller        Ecore_Poller;
+typedef struct _Ecore_Pipe          Ecore_Pipe;
 
 #ifndef _WIN32
 struct _Ecore_Exe
@@ -386,6 +387,13 @@
    signed char  delete_me : 1;
    int        (*func) (void *data);
    void        *data;
+};
+
+struct _Ecore_Pipe
+{
+   int    fd_read;
+   int    fd_write;
+   void (*handler) (void *data);
 };
 
 #endif
#include <stdlib.h>



#include "ecore_private.h"

#include "Ecore.h"





#ifdef _WIN32



# include <winsock2.h>



static int

e_pipe(int *fds)

{

   struct sockaddr_in saddr;

   struct timeval     tv;

   SOCKET             temp;

   SOCKET             socket1 = INVALID_SOCKET;

   SOCKET             socket2 = INVALID_SOCKET;

   u_long             arg;

   fd_set             read_set;

   fd_set             write_set;

   int                len;



   temp = socket (AF_INET, SOCK_STREAM, 0);



   if (temp == INVALID_SOCKET)

     goto out0;



   arg = 1;

   if (ioctlsocket (temp, FIONBIO, &arg) == SOCKET_ERROR)

     goto out0;



   memset (&saddr, 0, sizeof (saddr));

   saddr.sin_family = AF_INET;

   saddr.sin_port = 0;

   saddr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);



   if (bind (temp, (struct sockaddr *)&saddr, sizeof (saddr)))

     goto out0;



   if (listen (temp, 1) == SOCKET_ERROR)

     goto out0;



   len = sizeof (saddr);

   if (getsockname (temp, (struct sockaddr *)&saddr, &len))

     goto out0;



   socket1 = socket (AF_INET, SOCK_STREAM, 0);



   if (socket1 == INVALID_SOCKET)

     goto out0;



   arg = 1;

   if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR)

      goto out1;



   if ((connect (socket1, (struct sockaddr  *)&saddr, len) != SOCKET_ERROR) ||

       (WSAGetLastError () != WSAEWOULDBLOCK))

     goto out1;



   FD_ZERO (&read_set);

   FD_SET (temp, &read_set);



   tv.tv_sec = 0;

   tv.tv_usec = 0;



   if (select (0, &read_set, NULL, NULL, NULL) == SOCKET_ERROR)

     goto out1;



   if (!FD_ISSET (temp, &read_set))

     goto out1;



   socket2 = accept (temp, (struct sockaddr *) &saddr, &len);

   if (socket2 == INVALID_SOCKET)

     goto out1;



   FD_ZERO (&write_set);

   FD_SET (socket1, &write_set);



   tv.tv_sec = 0;

   tv.tv_usec = 0;



   if (select (0, NULL, &write_set, NULL, NULL) == SOCKET_ERROR)

     goto out2;



   if (!FD_ISSET (socket1, &write_set))

     goto out2;



   arg = 0;

   if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR)

     goto out2;



   arg = 0;

   if (ioctlsocket (socket2, FIONBIO, &arg) == SOCKET_ERROR)

     goto out2;



   fds[0] = socket1;

   fds[1] = socket2;



   closesocket (temp);



   return 0;



 out2:

   closesocket (socket2);

 out1:

   closesocket (socket1);

 out0:

   closesocket (temp);



   fds[0] = INVALID_SOCKET;

   fds[1] = INVALID_SOCKET;



   return -1;

}



# define e_write(fd, buffer) \

  send((fd), (char *)(buffer), sizeof(buffer), 0)



# define e_read(fd, buffer) \

  recv((fd), (char *)(buffer), sizeof(buffer), 0)



#else



# include <unistd.h>

# include <fcntl.h>



# define e_pipe(fds) \

  pipe(fds)



# define e_write(fd, buffer) \

  write((fd), (buffer), sizeof(buffer))



# define e_read(fd, buffer) \

  read((fd), (buffer), sizeof(buffer))



#endif /* _WIN32 */





static int _ecore_pipe_read(void             *data,

                            Ecore_Fd_Handler *fd_handler);



/**

 * @defgroup Ecore_Pipe_Group Pipe wrapper

 *

 * These functions wrap the pipe / write / read functions to

 * easily integrate a loop that is in its own thread to the ecore

 * main loop.

 *

 * The ecore_pipe_new() function creates file descriptors (sockets on

 * Windows) and attach an handle to the ecore main loop. That handle is

 * called when data is read in the pipe. To write data in the pipe,

 * just call ecore_pipe_write(). When you are done, just call

 * ecore_pipe_free().

 *

 * Here is an example that uses the pipe wrapper with a Gstreamer

 * pipeline. For each decoded frame in the Gstreamer thread, a handle

 * is called in the ecore thread.

 *

 * @code#include <gst/gst.h>

 * #include <Ecore.h>

 *

 * static int nbr = 0;

 *

 * static GstElement *_buid_pipeline (gchar *filename, Ecore_Pipe *pipe);

 *

 * static void new_decoded_pad_cb (GstElement *demuxer,

 *                                 GstPad     *new_pad,

 *                                 gpointer    user_data);

 *

 * static void handler(void *data)

 * {

 *   GstBuffer  *buffer;

 *

 *   printf ("handler : %p\n", data);

 *   buffer = data;

 *   printf ("frame  : %d %p %lld %p\n", nbr++, data, (long 
long)GST_BUFFER_DURATION(buffer), buffer);

 *   gst_buffer_unref (buffer);

 * }

 *

 *

 * static void handoff (GstElement* object,

 *                      GstBuffer* arg0,

 *                      GstPad* arg1,

 *                      gpointer user_data)

 * {

 *   Ecore_Pipe *pipe;

 *

 *   pipe = (Ecore_Pipe *)user_data;

 *   printf ("handoff : %p\n", arg0);

 *   gst_buffer_ref (arg0);

 *   ecore_pipe_write(pipe, arg0);

 * }

 *

 * int

 * main (int argc, char *argv[])

 * {

 *   GstElement *pipeline;

 *   char *filename;

 *   Ecore_Pipe *pipe;

 *

 *   gst_init (&argc, &argv);

 *

 *   if (!ecore_init ())

 *     {

 *       gst_deinit ();

 *       return 0;

 *     }

 *

 *   pipe = ecore_pipe_new (handler);

 *   if (!pipe)

 *     {

 *       ecore_shutdown ();

 *       gst_deinit ();

 *       return 0;

 *     }

 *

 *   if (argc < 2) {

 *     g_print ("usage: %s file.avi\n", argv[0]);

 *     ecore_pipe_free (pipe);

 *     ecore_shutdown ();

 *     gst_deinit ();

 *     return 0;

 *   }

 *   filename = argv[1];

 *

 *   pipeline = _buid_pipeline (filename, pipe);

 *   if (!pipeline) {

 *     g_print ("Error during the pipeline building\n");

 *     ecore_pipe_free (pipe);

 *     ecore_shutdown ();

 *     gst_deinit ();

 *     return -1;

 *   }

 *

 *   gst_element_set_state (pipeline, GST_STATE_PLAYING);

 *

 *   ecore_main_loop_begin();

 *

 *   ecore_pipe_free (pipe);

 *   ecore_shutdown ();

 *   gst_deinit ();

 *

 *   return 0;

 * }

 *

 * static void

 * new_decoded_pad_cb (GstElement *demuxer,

 *                     GstPad     *new_pad,

 *                     gpointer    user_data)

 * {

 *   GstElement *decoder;

 *   GstPad *pad;

 *   GstCaps *caps;

 *   gchar *str;

 *

 *   caps = gst_pad_get_caps (new_pad);

 *   str = gst_caps_to_string (caps);

 *

 *   if (g_str_has_prefix (str, "video/")) {

 *     decoder = GST_ELEMENT (user_data);

 *

 *     pad = gst_element_get_pad (decoder, "sink");

 *     if (GST_PAD_LINK_FAILED (gst_pad_link (new_pad, pad))) {

 *       g_warning ("Failed to link %s:%s to %s:%s", GST_DEBUG_PAD_NAME 
(new_pad),

 *                  GST_DEBUG_PAD_NAME (pad));

 *     }

 *   }

 *   g_free (str);

 *   gst_caps_unref (caps);

 * }

 *

 * static GstElement *

 * _buid_pipeline (gchar *filename, Ecore_Pipe *pipe)

 * {

 *   GstElement          *pipeline;

 *   GstElement          *filesrc;

 *   GstElement          *demuxer;

 *   GstElement          *decoder;

 *   GstElement          *sink;

  GstStateChangeReturn res;

 *

 *   pipeline = gst_pipeline_new ("pipeline");

 *   if (!pipeline)

 *     return NULL;

 *

 *   filesrc = gst_element_factory_make ("filesrc", "filesrc");

 *   if (!filesrc) {

 *     printf ("no filesrc");

 *     goto failure;

 *   }

 *   g_object_set (G_OBJECT (filesrc), "location", filename, NULL);

 *

 *   demuxer = gst_element_factory_make ("oggdemux", "demuxer");

 *   if (!demuxer) {

 *     printf ("no demux");

 *     goto failure;

 *   }

 *

 *   decoder = gst_element_factory_make ("theoradec", "decoder");

 *   if (!decoder) {

 *     printf ("no dec");

 *     goto failure;

 *   }

 *

 *   g_signal_connect (demuxer, "pad-added",

 *                     G_CALLBACK (new_decoded_pad_cb), decoder);

 *

 *   sink = gst_element_factory_make ("fakesink", "sink");

 *   if (!sink) {

 *     printf ("no sink");

 *     goto failure;

 *   }

 *   g_object_set (G_OBJECT (sink), "sync", TRUE, NULL);

 *   g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL);

 *   g_signal_connect (sink, "handoff",

 *                     G_CALLBACK (handoff), pipe);

 *

 *   gst_bin_add_many (GST_BIN (pipeline),

 *                     filesrc, demuxer, decoder, sink, NULL);

 *

 *   if (!gst_element_link (filesrc, demuxer))

 *     goto failure;

 *   if (!gst_element_link (decoder, sink))

 *     goto failure;

 *

 *   res = gst_element_set_state (pipeline, GST_STATE_PAUSED);

 *   if (res == GST_STATE_CHANGE_FAILURE)

 *     goto failure;

 *

 *   res = gst_element_get_state( pipeline, NULL, NULL, GST_CLOCK_TIME_NONE );

 *   if (res != GST_STATE_CHANGE_SUCCESS)

 *     goto failure;

 *

 *   return pipeline;

 *

 *  failure:

 *   gst_object_unref (GST_OBJECT (pipeline));

 *   return NULL;

 * }

 * @endcode

 */





/**

 * Create two file descriptors (sockets on Windows) and add

 * a callback that will be called when the file descriptor that

 * is listened receives data.

 *

 * @param handler The handler called when data is received.

 * @return        A newly created Ecore_Pipe object if successful.

 *                @c NULL otherwise.

 * @ingroup Ecore_Pipe_Group

 */

EAPI Ecore_Pipe *

ecore_pipe_new (void (*handler) (void *data))

{

   Ecore_Pipe       *p;

   Ecore_Fd_Handler *fd_handler;

   int               fds[2];



   p = (Ecore_Pipe *)malloc(sizeof (Ecore_Pipe));



   if (e_pipe(fds))

     {

        free(p);

        return NULL;

     }



   p->fd_read = fds[0];

   p->fd_write = fds[1];

   p->handler = handler;

#ifndef _WIN32

   fcntl(p->fd_read, F_SETFL, O_NONBLOCK);

#endif /* _WIN32 */

   fd_handler = ecore_main_fd_handler_add(p->fd_read,

                                          ECORE_FD_READ,

                                          _ecore_pipe_read,

                                          p,

                                          NULL, NULL);



     return pipe;

}



/**

 * Free an Ecore_Pipe object created with ecore_pipe_new().

 *

 * @param p The Ecore_Pipe object to be freed.

 * @ingroup Ecore_Pipe_Group

 */

EAPI void

ecore_pipe_free(Ecore_Pipe *p)

{

   if (!p)

     return;



   free (p);

}



/**

 * Write on the file descriptor the data passed a parameter.

 *

 * @param p The Ecore_Pipe object.

 * @param The data to write in the pipe.

 * @ingroup Ecore_Pipe_Group

 */

EAPI void

ecore_pipe_write(Ecore_Pipe *p,

                 void       *data)

{

   void *buf[1];



   printf ("ecore_pipe_write : %p\n", data);

   buf[0] = data;

   e_write (p->fd_write, buf);

}



/* Private function */



static int

_ecore_pipe_read(void             *data,

                 Ecore_Fd_Handler *fd_handler)

{

   Ecore_Pipe *p;

   void       *buf[1];

   int         len;



   p = (Ecore_Pipe *)data;

   while ((len = e_read(p->fd_read, buf)) > 0)

     {

        if (len = sizeof (buf))

          {

             printf ("ecore_pipe_read : %p\n", buf[0]);

             p->handler(buf[0]);

          }

     }



   return 1;

}

-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
enlightenment-devel mailing list
enlightenment-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/enlightenment-devel

Reply via email to