Hey,
as i have succedded in integrating the gstreamer thread in the ecore main
loop on Windows, and as some of you talked about a pipe wrapper, I've
quickly written a wrapper around pipe(), read() and write().
I have attached the source file to put in ecore/src/lib/ecore/ and a diff.
There is a complete example that uses that wrapper with gstreamer (the
test I used on Windows, btw) and the documentation is here too.
That can, maybe, help people to integrate other threads or loops (like the
threaded glib loop) in the ecore main loop.
feel free to comment :)
Vincent
? ecore.patch
? ecore_pipe.diff
? src/lib/ecore/ecore_pipe.c
Index: configure.in
===================================================================
RCS file: /cvs/e/e17/libs/ecore/configure.in,v
retrieving revision 1.247
diff -u -r1.247 configure.in
--- configure.in 25 Jan 2008 03:35:46 -0000 1.247
+++ configure.in 25 Jan 2008 19:39:22 -0000
@@ -730,7 +730,7 @@
ECORE_CHECK_MODULE([IMF], [yes])
try_ecore_imf_evas=no
-if test "x$have_ecore_imf" = xyes -a "x$have_evas" = "xyes"; then
+if test "x$have_ecore_imf" = "xyes" -a "x$have_evas" = "xyes"; then
try_ecore_imf_evas=yes
fi
Index: src/lib/ecore/Ecore.h
===================================================================
RCS file: /cvs/e/e17/libs/ecore/src/lib/ecore/Ecore.h,v
retrieving revision 1.58
diff -u -r1.58 Ecore.h
--- src/lib/ecore/Ecore.h 25 Jan 2008 18:28:16 -0000 1.58
+++ src/lib/ecore/Ecore.h 25 Jan 2008 19:39:22 -0000
@@ -118,6 +118,7 @@
typedef void Ecore_Event; /**< A handle for an event */
typedef void Ecore_Animator; /**< A handle for animators */
typedef void Ecore_Poller; /**< A handle for pollers */
+ typedef void Ecore_Pipe; /**< A handle for pipes */
#endif
typedef struct _Ecore_Event_Signal_User Ecore_Event_Signal_User; /**<
User signal event */
typedef struct _Ecore_Event_Signal_Hup Ecore_Event_Signal_Hup; /**<
Hup signal event */
@@ -290,6 +291,11 @@
EAPI double ecore_poller_poll_interval_get(Ecore_Poller_Type type);
EAPI Ecore_Poller *ecore_poller_add(Ecore_Poller_Type type, int interval,
int (*func) (void *data), const void *data);
EAPI void *ecore_poller_del(Ecore_Poller *poller);
+
+ EAPI Ecore_Pipe *ecore_pipe_new (void (*handler) (void *data));
+ EAPI void ecore_pipe_free(Ecore_Pipe *pipe);
+ EAPI void ecore_pipe_write(Ecore_Pipe *pipe,
+ void *data);
#ifdef __cplusplus
Index: src/lib/ecore/Makefile.am
===================================================================
RCS file: /cvs/e/e17/libs/ecore/src/lib/ecore/Makefile.am,v
retrieving revision 1.14
diff -u -r1.14 Makefile.am
--- src/lib/ecore/Makefile.am 11 Jan 2008 07:33:56 -0000 1.14
+++ src/lib/ecore/Makefile.am 25 Jan 2008 19:39:22 -0000
@@ -32,6 +32,7 @@
ecore_tree.c \
ecore_value.c \
ecore_poll.c \
+ecore_pipe.c \
ecore_private.h
libecore_la_LIBADD = @dlopen_libs@ @winsock_libs@ -lm
Index: src/lib/ecore/ecore_private.h
===================================================================
RCS file: /cvs/e/e17/libs/ecore/src/lib/ecore/ecore_private.h,v
retrieving revision 1.46
diff -u -r1.46 ecore_private.h
--- src/lib/ecore/ecore_private.h 11 Jan 2008 07:33:56 -0000 1.46
+++ src/lib/ecore/ecore_private.h 25 Jan 2008 19:39:23 -0000
@@ -241,6 +241,7 @@
typedef struct _Ecore_Event Ecore_Event;
typedef struct _Ecore_Animator Ecore_Animator;
typedef struct _Ecore_Poller Ecore_Poller;
+typedef struct _Ecore_Pipe Ecore_Pipe;
#ifndef _WIN32
struct _Ecore_Exe
@@ -386,6 +387,13 @@
signed char delete_me : 1;
int (*func) (void *data);
void *data;
+};
+
+struct _Ecore_Pipe
+{
+ int fd_read;
+ int fd_write;
+ void (*handler) (void *data);
};
#endif
#include <stdlib.h>
#include "ecore_private.h"
#include "Ecore.h"
#ifdef _WIN32
# include <winsock2.h>
static int
e_pipe(int *fds)
{
struct sockaddr_in saddr;
struct timeval tv;
SOCKET temp;
SOCKET socket1 = INVALID_SOCKET;
SOCKET socket2 = INVALID_SOCKET;
u_long arg;
fd_set read_set;
fd_set write_set;
int len;
temp = socket (AF_INET, SOCK_STREAM, 0);
if (temp == INVALID_SOCKET)
goto out0;
arg = 1;
if (ioctlsocket (temp, FIONBIO, &arg) == SOCKET_ERROR)
goto out0;
memset (&saddr, 0, sizeof (saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = 0;
saddr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
if (bind (temp, (struct sockaddr *)&saddr, sizeof (saddr)))
goto out0;
if (listen (temp, 1) == SOCKET_ERROR)
goto out0;
len = sizeof (saddr);
if (getsockname (temp, (struct sockaddr *)&saddr, &len))
goto out0;
socket1 = socket (AF_INET, SOCK_STREAM, 0);
if (socket1 == INVALID_SOCKET)
goto out0;
arg = 1;
if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR)
goto out1;
if ((connect (socket1, (struct sockaddr *)&saddr, len) != SOCKET_ERROR) ||
(WSAGetLastError () != WSAEWOULDBLOCK))
goto out1;
FD_ZERO (&read_set);
FD_SET (temp, &read_set);
tv.tv_sec = 0;
tv.tv_usec = 0;
if (select (0, &read_set, NULL, NULL, NULL) == SOCKET_ERROR)
goto out1;
if (!FD_ISSET (temp, &read_set))
goto out1;
socket2 = accept (temp, (struct sockaddr *) &saddr, &len);
if (socket2 == INVALID_SOCKET)
goto out1;
FD_ZERO (&write_set);
FD_SET (socket1, &write_set);
tv.tv_sec = 0;
tv.tv_usec = 0;
if (select (0, NULL, &write_set, NULL, NULL) == SOCKET_ERROR)
goto out2;
if (!FD_ISSET (socket1, &write_set))
goto out2;
arg = 0;
if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR)
goto out2;
arg = 0;
if (ioctlsocket (socket2, FIONBIO, &arg) == SOCKET_ERROR)
goto out2;
fds[0] = socket1;
fds[1] = socket2;
closesocket (temp);
return 0;
out2:
closesocket (socket2);
out1:
closesocket (socket1);
out0:
closesocket (temp);
fds[0] = INVALID_SOCKET;
fds[1] = INVALID_SOCKET;
return -1;
}
# define e_write(fd, buffer) \
send((fd), (char *)(buffer), sizeof(buffer), 0)
# define e_read(fd, buffer) \
recv((fd), (char *)(buffer), sizeof(buffer), 0)
#else
# include <unistd.h>
# include <fcntl.h>
# define e_pipe(fds) \
pipe(fds)
# define e_write(fd, buffer) \
write((fd), (buffer), sizeof(buffer))
# define e_read(fd, buffer) \
read((fd), (buffer), sizeof(buffer))
#endif /* _WIN32 */
static int _ecore_pipe_read(void *data,
Ecore_Fd_Handler *fd_handler);
/**
* @defgroup Ecore_Pipe_Group Pipe wrapper
*
* These functions wrap the pipe / write / read functions to
* easily integrate a loop that is in its own thread to the ecore
* main loop.
*
* The ecore_pipe_new() function creates file descriptors (sockets on
* Windows) and attach an handle to the ecore main loop. That handle is
* called when data is read in the pipe. To write data in the pipe,
* just call ecore_pipe_write(). When you are done, just call
* ecore_pipe_free().
*
* Here is an example that uses the pipe wrapper with a Gstreamer
* pipeline. For each decoded frame in the Gstreamer thread, a handle
* is called in the ecore thread.
*
* @code#include <gst/gst.h>
* #include <Ecore.h>
*
* static int nbr = 0;
*
* static GstElement *_buid_pipeline (gchar *filename, Ecore_Pipe *pipe);
*
* static void new_decoded_pad_cb (GstElement *demuxer,
* GstPad *new_pad,
* gpointer user_data);
*
* static void handler(void *data)
* {
* GstBuffer *buffer;
*
* printf ("handler : %p\n", data);
* buffer = data;
* printf ("frame : %d %p %lld %p\n", nbr++, data, (long
long)GST_BUFFER_DURATION(buffer), buffer);
* gst_buffer_unref (buffer);
* }
*
*
* static void handoff (GstElement* object,
* GstBuffer* arg0,
* GstPad* arg1,
* gpointer user_data)
* {
* Ecore_Pipe *pipe;
*
* pipe = (Ecore_Pipe *)user_data;
* printf ("handoff : %p\n", arg0);
* gst_buffer_ref (arg0);
* ecore_pipe_write(pipe, arg0);
* }
*
* int
* main (int argc, char *argv[])
* {
* GstElement *pipeline;
* char *filename;
* Ecore_Pipe *pipe;
*
* gst_init (&argc, &argv);
*
* if (!ecore_init ())
* {
* gst_deinit ();
* return 0;
* }
*
* pipe = ecore_pipe_new (handler);
* if (!pipe)
* {
* ecore_shutdown ();
* gst_deinit ();
* return 0;
* }
*
* if (argc < 2) {
* g_print ("usage: %s file.avi\n", argv[0]);
* ecore_pipe_free (pipe);
* ecore_shutdown ();
* gst_deinit ();
* return 0;
* }
* filename = argv[1];
*
* pipeline = _buid_pipeline (filename, pipe);
* if (!pipeline) {
* g_print ("Error during the pipeline building\n");
* ecore_pipe_free (pipe);
* ecore_shutdown ();
* gst_deinit ();
* return -1;
* }
*
* gst_element_set_state (pipeline, GST_STATE_PLAYING);
*
* ecore_main_loop_begin();
*
* ecore_pipe_free (pipe);
* ecore_shutdown ();
* gst_deinit ();
*
* return 0;
* }
*
* static void
* new_decoded_pad_cb (GstElement *demuxer,
* GstPad *new_pad,
* gpointer user_data)
* {
* GstElement *decoder;
* GstPad *pad;
* GstCaps *caps;
* gchar *str;
*
* caps = gst_pad_get_caps (new_pad);
* str = gst_caps_to_string (caps);
*
* if (g_str_has_prefix (str, "video/")) {
* decoder = GST_ELEMENT (user_data);
*
* pad = gst_element_get_pad (decoder, "sink");
* if (GST_PAD_LINK_FAILED (gst_pad_link (new_pad, pad))) {
* g_warning ("Failed to link %s:%s to %s:%s", GST_DEBUG_PAD_NAME
(new_pad),
* GST_DEBUG_PAD_NAME (pad));
* }
* }
* g_free (str);
* gst_caps_unref (caps);
* }
*
* static GstElement *
* _buid_pipeline (gchar *filename, Ecore_Pipe *pipe)
* {
* GstElement *pipeline;
* GstElement *filesrc;
* GstElement *demuxer;
* GstElement *decoder;
* GstElement *sink;
GstStateChangeReturn res;
*
* pipeline = gst_pipeline_new ("pipeline");
* if (!pipeline)
* return NULL;
*
* filesrc = gst_element_factory_make ("filesrc", "filesrc");
* if (!filesrc) {
* printf ("no filesrc");
* goto failure;
* }
* g_object_set (G_OBJECT (filesrc), "location", filename, NULL);
*
* demuxer = gst_element_factory_make ("oggdemux", "demuxer");
* if (!demuxer) {
* printf ("no demux");
* goto failure;
* }
*
* decoder = gst_element_factory_make ("theoradec", "decoder");
* if (!decoder) {
* printf ("no dec");
* goto failure;
* }
*
* g_signal_connect (demuxer, "pad-added",
* G_CALLBACK (new_decoded_pad_cb), decoder);
*
* sink = gst_element_factory_make ("fakesink", "sink");
* if (!sink) {
* printf ("no sink");
* goto failure;
* }
* g_object_set (G_OBJECT (sink), "sync", TRUE, NULL);
* g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL);
* g_signal_connect (sink, "handoff",
* G_CALLBACK (handoff), pipe);
*
* gst_bin_add_many (GST_BIN (pipeline),
* filesrc, demuxer, decoder, sink, NULL);
*
* if (!gst_element_link (filesrc, demuxer))
* goto failure;
* if (!gst_element_link (decoder, sink))
* goto failure;
*
* res = gst_element_set_state (pipeline, GST_STATE_PAUSED);
* if (res == GST_STATE_CHANGE_FAILURE)
* goto failure;
*
* res = gst_element_get_state( pipeline, NULL, NULL, GST_CLOCK_TIME_NONE );
* if (res != GST_STATE_CHANGE_SUCCESS)
* goto failure;
*
* return pipeline;
*
* failure:
* gst_object_unref (GST_OBJECT (pipeline));
* return NULL;
* }
* @endcode
*/
/**
* Create two file descriptors (sockets on Windows) and add
* a callback that will be called when the file descriptor that
* is listened receives data.
*
* @param handler The handler called when data is received.
* @return A newly created Ecore_Pipe object if successful.
* @c NULL otherwise.
* @ingroup Ecore_Pipe_Group
*/
EAPI Ecore_Pipe *
ecore_pipe_new (void (*handler) (void *data))
{
Ecore_Pipe *p;
Ecore_Fd_Handler *fd_handler;
int fds[2];
p = (Ecore_Pipe *)malloc(sizeof (Ecore_Pipe));
if (e_pipe(fds))
{
free(p);
return NULL;
}
p->fd_read = fds[0];
p->fd_write = fds[1];
p->handler = handler;
#ifndef _WIN32
fcntl(p->fd_read, F_SETFL, O_NONBLOCK);
#endif /* _WIN32 */
fd_handler = ecore_main_fd_handler_add(p->fd_read,
ECORE_FD_READ,
_ecore_pipe_read,
p,
NULL, NULL);
return pipe;
}
/**
* Free an Ecore_Pipe object created with ecore_pipe_new().
*
* @param p The Ecore_Pipe object to be freed.
* @ingroup Ecore_Pipe_Group
*/
EAPI void
ecore_pipe_free(Ecore_Pipe *p)
{
if (!p)
return;
free (p);
}
/**
* Write on the file descriptor the data passed a parameter.
*
* @param p The Ecore_Pipe object.
* @param The data to write in the pipe.
* @ingroup Ecore_Pipe_Group
*/
EAPI void
ecore_pipe_write(Ecore_Pipe *p,
void *data)
{
void *buf[1];
printf ("ecore_pipe_write : %p\n", data);
buf[0] = data;
e_write (p->fd_write, buf);
}
/* Private function */
static int
_ecore_pipe_read(void *data,
Ecore_Fd_Handler *fd_handler)
{
Ecore_Pipe *p;
void *buf[1];
int len;
p = (Ecore_Pipe *)data;
while ((len = e_read(p->fd_read, buf)) > 0)
{
if (len = sizeof (buf))
{
printf ("ecore_pipe_read : %p\n", buf[0]);
p->handler(buf[0]);
}
}
return 1;
}
-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
enlightenment-devel mailing list
enlightenment-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/enlightenment-devel